##Import Libraries

In [0]:
from pyspark.sql import SparkSession, Window
from datetime import datetime
from pyspark.sql.functions import explode, col, lit, when, to_date, trim, arrays_zip, split, lower, round, explode_outer, regexp_replace, max, coalesce, regexp_extract, substring, instr, length
import builtins
from pyspark.sql.types import *

In [0]:
# mount data
mount_point = "/mnt/adverse-events"

In [0]:
# unmount if needed
mount_point = mount_point
dbutils.fs.unmount(mount_point)

/mnt/adverse-events has been unmounted.


True

##Connect to Storage Account

In [0]:
# connect to storage account and containers
configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "e99fd849-a6b3-49c5-b7a0-044016a9b700",
           "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope = "key-vault-scope", key = "client-secret") ,
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/62a42a60-1f3b-41ee-92d8-d46ce7b9d9ef/oauth2/token/"}

# mount data
dbutils.fs.mount(
    source = "abfss://adverse-events@adverseeventsa.dfs.core.windows.net",
    mount_point = mount_point,
    extra_configs = configs
)

True

In [0]:
%fs
ls "/mnt/adverse-events"

path,name,size,modificationTime
dbfs:/mnt/adverse-events/raw-data/,raw-data/,0,1728067477000
dbfs:/mnt/adverse-events/transformed-data/,transformed-data/,0,1728067489000


##Read JSON into a Spark DataFrame

In [0]:
directory_path = "dbfs:/mnt/adverse-events/raw-data/"
files = dbutils.fs.ls(directory_path)
print(files)
print()

# use builtin max function - not spark max
latest_file = builtins.max(files, key=lambda x: x.modificationTime).path
print(latest_file)

spark = SparkSession.builder.appName("JSON Parser").getOrCreate()

latest_file = 'dbfs:/mnt/adverse-events/raw-data/fda_adverse_events_extract_2024-01-01-TO-2024-12-31.json'
df = spark.read.json(latest_file)

exploded_df = df.withColumn("results", explode(col("results"))) 


[FileInfo(path='dbfs:/mnt/adverse-events/raw-data/fda_adverse_events_extract_2015-01-01-TO-2015-12-31.json', name='fda_adverse_events_extract_2015-01-01-TO-2015-12-31.json', size=14485018982, modificationTime=1730140145000), FileInfo(path='dbfs:/mnt/adverse-events/raw-data/fda_adverse_events_extract_2016-01-01-TO-2016-12-31.json', name='fda_adverse_events_extract_2016-01-01-TO-2016-12-31.json', size=16645598394, modificationTime=1730135156000), FileInfo(path='dbfs:/mnt/adverse-events/raw-data/fda_adverse_events_extract_2017-01-01-TO-2017-12-31.json', name='fda_adverse_events_extract_2017-01-01-TO-2017-12-31.json', size=17408305231, modificationTime=1730129494000), FileInfo(path='dbfs:/mnt/adverse-events/raw-data/fda_adverse_events_extract_2018-01-01-TO-2018-12-31.json', name='fda_adverse_events_extract_2018-01-01-TO-2018-12-31.json', size=21625503696, modificationTime=1730081327000), FileInfo(path='dbfs:/mnt/adverse-events/raw-data/fda_adverse_events_extract_2019-01-01-TO-2019-12-31.js

##Parse Patient Data into Spark DataFrame for Patient Dimension Table

In [0]:
# Select key fields for viewing
dim_patient = exploded_df.select(
    col("results.safetyreportid").alias("patient_id"),
    col("results.patient.patientonsetage").alias("patient_onset_age"),
    col("results.patient.patientonsetageunit").alias("patient_onset_age_unit"),
    col("results.patient.patientagegroup").alias("patient_age_group"),
    col("results.patient.patientsex").alias("patient_sex"),
    col("results.patient.patientweight").alias("patient_weight(kg)")
)

# convert onset age to interger and calculate to years
dim_patient = dim_patient.withColumn("patient_onset_age", col("patient_onset_age").cast(IntegerType()))

dim_patient = dim_patient.withColumn("patient_onset_age", 
                                     when(dim_patient.patient_onset_age_unit == 800, dim_patient.patient_onset_age*10)
                                     .when(dim_patient.patient_onset_age_unit == 802, dim_patient.patient_onset_age/12)
                                     .when(dim_patient.patient_onset_age_unit == 803, dim_patient.patient_onset_age/52)
                                     .when(dim_patient.patient_onset_age_unit == 803, dim_patient.patient_onset_age/365)
                                     .when(dim_patient.patient_onset_age_unit == 804, dim_patient.patient_onset_age/8760)
                                     .otherwise(dim_patient.patient_onset_age))
# round age (years) to two decimals                                    
dim_patient = dim_patient.withColumn("patient_onset_age", round(col("patient_onset_age"), 2))

# when onset age is give but age group is not given, calculate age group
dim_patient = dim_patient.withColumn("patient_age_group", 
                                     when(dim_patient.patient_onset_age < 0.2, 'Neonate')
                                     .when(dim_patient.patient_onset_age < 1, 'Infant')
                                     .when(dim_patient.patient_onset_age < 12, 'Child')
                                     .when(dim_patient.patient_onset_age < 19, 'Adolescent')
                                     .when(dim_patient.patient_onset_age < 66, 'Adult')
                                     .when(dim_patient.patient_onset_age > 65, 'Elderly')
                                     .otherwise(when(dim_patient.patient_age_group == '1', 'Neonate')
                                               .when(dim_patient.patient_age_group == '2', 'Infant')
                                               .when(dim_patient.patient_age_group == '3', 'Child')
                                               .when(dim_patient.patient_age_group == '4', 'Adolescent')
                                               .when(dim_patient.patient_age_group == '5', 'Adult')
                                               .when(dim_patient.patient_age_group == '6', 'Elderly')))

dim_patient = dim_patient.withColumn("patient_sex", 
                                     when(dim_patient.patient_sex == 1, 'Male')
                                     .when(dim_patient.patient_sex == 2, 'Female')
                                     .otherwise("Unknown"))
dim_patient = dim_patient.filter('patient_onset_age <= 120')
dim_patient = dim_patient.drop(col('patient_onset_age_unit'))
dim_patient = dim_patient.dropna(subset=['patient_onset_age'])
dim_patient = dim_patient.withColumn('patient_id', col('patient_id').cast(IntegerType()))
display(dim_patient)

patient_id,patient_onset_age,patient_age_group,patient_sex,patient_weight(kg)
23353582,50.0,Adult,Female,
23353583,89.0,Elderly,Male,
23353586,54.0,Adult,Male,
23353588,61.0,Adult,Male,
23353589,57.0,Adult,Female,
23353590,60.0,Adult,Female,
23353591,83.0,Elderly,Male,
23353592,62.0,Adult,Female,
23353593,84.0,Elderly,Female,
23353595,53.0,Adult,Male,82.0


In [0]:
dim_patient.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- patient_onset_age: double (nullable = true)
 |-- patient_age_group: string (nullable = true)
 |-- patient_sex: string (nullable = false)
 |-- patient_weight(kg): string (nullable = true)



##Parse Drug Information into Spark DataFrame for Drug Information dimension table

In [0]:
# Select key fields for viewing
dim_drug_information = exploded_df.select(
    col("results.safetyreportid").alias("safety_report_id"),
    col("results.patient.drug.medicinalproduct").alias("medicinal_product"),
    col("results.patient.drug.activesubstance.activesubstancename").alias("active_substance"),
    col("results.patient.drug.drugindication").alias("indication"),
    col("results.patient.drug.drugadministrationroute").alias("administration_route"),
    col("results.patient.drug.drugdosageform").alias("dosage_form"),
    col("results.patient.drug.openfda.pharm_class_pe").alias("pharmacodynamic_effect"),
    col("results.patient.drug.openfda.pharm_class_cs").alias("chemical_structure_class"),
    col("results.patient.drug.openfda.pharm_class_epc").alias("established_pharmacologic_class"),
    col("results.patient.drug.openfda.pharm_class_moa").alias("mechanism_of_action")
)

dim_drug_information= (dim_drug_information  
    .withColumn("tmp", arrays_zip("medicinal_product",
                                  "indication", 
                                  "administration_route", 
                                  "dosage_form"))
    .withColumn("tmp", explode_outer("tmp"))
    .select(col("tmp.medicinal_product"),
            col("tmp.indication"),
            col("tmp.administration_route"), 
            col("tmp.dosage_form")
    )
    .dropDuplicates(["medicinal_product"]))

# List of columns to convert to lowercase
cols_to_lower = ["medicinal_product", "indication", "dosage_form"]

for column_name in cols_to_lower:
    dim_drug_information = dim_drug_information.withColumn(column_name, lower(col(column_name)))


dim_drug_information = dim_drug_information.withColumn('indication', regexp_replace("indication", "\^", ""))

dim_drug_information = dim_drug_information.withColumn('medicinal_product', trim(regexp_replace("medicinal_product", "[\\(\\):\\[\\-+_\\^\\.?#*;,%\\d]", "")))
dim_drug_information = dim_drug_information.dropna(thresh=2)

dim_drug_information = dim_drug_information.withColumn('medicinal_product', regexp_replace("medicinal_product", "aluminium", "aluminum"))
dim_drug_information = dim_drug_information.withColumn("string_split", split(dim_drug_information["medicinal_product"], "\\\\"))
dim_drug_information = dim_drug_information.withColumn("medicinal_product", lower(dim_drug_information["string_split"].getItem(0)))
dim_drug_information = dim_drug_information.drop('string_split')


dim_drug_information = dim_drug_information.withColumn("string_split", split(dim_drug_information["medicinal_product"], "/"))
dim_drug_information = dim_drug_information.withColumn("medicinal_product", lower(trim(dim_drug_information["string_split"].getItem(0))))
dim_drug_information = dim_drug_information.drop('string_split')


dim_drug_information = dim_drug_information.withColumn(
    "medicinal_product",
    trim(substring(
        dim_drug_information["medicinal_product"],
        1,
        instr(dim_drug_information["medicinal_product"], " ") - 1
    )
))
dim_drug_information = dim_drug_information.filter(length(dim_drug_information.medicinal_product) >= 5)
dim_drug_information = dim_drug_information.filter(length(dim_drug_information.medicinal_product) < 20)
dim_drug_information = dim_drug_information.dropDuplicates(['medicinal_product'])


display(dim_drug_information)

medicinal_product,indication,administration_route,dosage_form
abacavir,product used for unknown indication,65.0,
abasaglar,,,solution for injection in pre-filled pen
abemaciclib,neoplasm malignant,48.0,
abilify,schizophrenia,65.0,suspension for injection
abiraterone,prostate cancer,48.0,
abreva,product used for unknown indication,,
absorbine,product used for unknown indication,61.0,
absorica,product used for unknown indication,65.0,capsule
acamprosate,,,tablet
acarbose,diabetes mellitus,48.0,"tablet, orally disintegrating"


##Parse Report Metadata into Spark DataFrame

In [0]:
# Select key fields for viewing
dim_report_metadata = exploded_df.select(
    col("results.safetyreportid").alias("safety_report_id"),
    col("results.occurcountry").alias("occurance_country"),
    to_date(col("results.receivedate"), "yyyyMMdd").alias("date_report_received"),
    to_date(col("results.transmissiondate"), "yyyyMMdd").alias("date_of_transmission"),
    col("results.companynumb").alias("reported_company")
)

# Split the string by dashes
dim_report_metadata = dim_report_metadata.withColumn("string_split", split(dim_report_metadata["reported_company"], "-"))

# Extract the string between the first and second dashes
dim_report_metadata = dim_report_metadata.withColumn("reported_company", lower(dim_report_metadata["string_split"].getItem(1)))
dim_report_metadata = dim_report_metadata.withColumn('reported_company', trim(regexp_replace("reported_company", "inc", "")))
dim_report_metadata = dim_report_metadata.withColumn('reported_company', trim(regexp_replace("reported_company", "[,]", "")))
dim_report_metadata = dim_report_metadata.withColumn('reported_company', regexp_replace("reported_company", "[._]", " "))


dim_report_metadata = dim_report_metadata.drop('string_split')
# Show the flattened data
display(dim_report_metadata)

safety_report_id,occurance_country,date_report_received,date_of_transmission,reported_company
23353582,US,2024-01-01,2024-04-10,sa
23353583,US,2024-01-01,2024-04-10,amgen
23353584,US,2024-01-01,2024-04-10,amgen
23353585,CA,2024-01-01,2024-04-10,abbvie
23353586,US,2024-01-01,2024-04-09,amgen
23353587,US,2024-01-01,2024-04-09,abbvie
23353588,CA,2024-01-01,2024-04-10,abbvie
23353589,CN,2024-01-01,2024-04-10,abbvie
23353590,CA,2024-01-01,2024-04-09,abbvie
23353591,CA,2024-01-01,2024-04-09,abbvie


##Parse Reaction Data into Spark DataFrame

In [0]:
# Select key fields for viewing
fact_reaction = exploded_df.select(
    col("results.safetyreportid").alias("patient_id"),
    col("results.patient.reaction.reactionmeddrapt").alias("patient_reaction"),
    col("results.patient.reaction.reactionoutcome").alias("reaction_outcome"),
    col("results.patient.drug.actiondrug").getItem(0).alias("action_taken"),
    col("results.patient.drug.medicinalproduct").getItem(0).alias("medicinal_product"),
    col("results.seriousnessdeath").alias("patient_death"),
    col("results.seriousnesslifethreatening").alias("life_threatening"),
    col("results.seriousnessdisabling").alias("disabling"),
    col("results.seriousnesshospitalization").alias("hospitalization"),
    col("results.seriousnesscongenitalanomali").alias("congenital_anomaly"),
    col("results.seriousnessother").alias("other_issues"),
    col("results.patient.drug.drugstartdate").alias("start_dosage_date"),
    col("results.patient.drug.drugenddate").alias("end_dosage_date"),

)

fact_reaction = fact_reaction.withColumn("reaction_severity", 
                                         when(fact_reaction.patient_death == "1", 1)
                                         .when(fact_reaction.life_threatening == "1", 2)
                                         .when(fact_reaction.disabling == "1", 3)
                                         .when(fact_reaction.hospitalization == "1", 4)
                                         .when(fact_reaction.congenital_anomaly == "1", 5)
                                         .when(fact_reaction.other_issues == "1", 6)
                                         .otherwise(7))

fact_reaction= (fact_reaction  
    .withColumn("tmp", arrays_zip("patient_reaction",
                                  "reaction_outcome",
                                  "start_dosage_date", 
                                  "end_dosage_date"))
    .withColumn("tmp", explode("tmp"))
    .select(col("patient_id"),
            col("tmp.patient_reaction"), 
            col("tmp.reaction_outcome"),
            col("reaction_severity"),
            col("action_taken"),
            col("medicinal_product"),
            to_date(col("tmp.start_dosage_date"),'yyyyMMdd').alias("dosage_start_date"), 
            to_date(col("tmp.end_dosage_date"), 'yyyyMMdd').alias("dosage_end_date"))
    .distinct()
    .dropna(subset=['patient_reaction']))


# List of columns to convert to lowercase
cols_to_lower = ["patient_reaction", "medicinal_product"]

for column_name in cols_to_lower:
    fact_reaction = fact_reaction.withColumn(column_name, lower(col(column_name)))

fact_reaction = fact_reaction.drop('patient_death', 'life_threatening', 'disabling', 'hospitalization', 'congenital_anomaly', 'other_issues')

# Step 1: Define the window specification
window_spec = Window.partitionBy("patient_id")

# Step 2: Get the max dosage_start_date within each safety_report_id group
df_with_max_date = fact_reaction.withColumn(
    "max_dosage_start_date", max(col("dosage_start_date")).over(window_spec)
).withColumn(
    "max_dosage_end_date", max(col("dosage_end_date")).over(window_spec)
)

# Step 3: Use coalesce to replace NULL dosage_start_date with the non-null value from max_dosage_start_date
df_filled = df_with_max_date.withColumn("start_dosage_date", coalesce("dosage_start_date", "max_dosage_start_date")).withColumn("end_dosage_date", coalesce("dosage_end_date", "max_dosage_end_date"))

# Step 4: Drop the extra max_dosage_start_date column if no longer needed
fact_reaction = df_filled.drop("max_dosage_start_date", "max_dosage_end_date", "dosage_start_date", "dosage_end_date")

fact_reaction = fact_reaction.withColumn('medicinal_product', trim(regexp_replace("medicinal_product", "[\\(\\):\\[\\-+_\\^\\.?#*;,%\\d]", "")))
fact_reaction = fact_reaction.dropna(thresh=2)

fact_reaction = fact_reaction.withColumn('medicinal_product', regexp_replace("medicinal_product", "aluminium", "aluminum"))
fact_reaction = fact_reaction.withColumn("string_split", split(fact_reaction["medicinal_product"], "\\\\"))
fact_reaction = fact_reaction.withColumn("medicinal_product", lower(fact_reaction["string_split"].getItem(0)))
fact_reaction = fact_reaction.drop('string_split')

fact_reaction = fact_reaction.withColumn("string_split", split(fact_reaction["medicinal_product"], "/"))
fact_reaction = fact_reaction.withColumn("medicinal_product", lower(trim(fact_reaction["string_split"].getItem(0))))
fact_reaction = fact_reaction.drop('string_split')

fact_reaction = fact_reaction.withColumn(
    "medicinal_product",
    substring(
        fact_reaction["medicinal_product"],
        1,
        instr(fact_reaction["medicinal_product"], " ") - 1
    )
)
fact_reaction = fact_reaction.filter(length(fact_reaction.medicinal_product) >= 5)
fact_reaction = fact_reaction.filter(length(fact_reaction.medicinal_product) < 20)


display(fact_reaction)

patient_id,patient_reaction,reaction_outcome,reaction_severity,action_taken,medicinal_product,start_dosage_date,end_dosage_date
23353698,death,5.0,1,6.0,leuprolide,,
23353707,nausea,6.0,7,1.0,aptensio,,
23353707,dry mouth,6.0,7,1.0,aptensio,,
23353741,product formulation issue,3.0,7,4.0,oxycodone,,
23353741,drug ineffective,3.0,7,4.0,oxycodone,,
23353750,drug diversion,6.0,7,5.0,oxycodone,,
23353754,product physical issue,6.0,7,5.0,dextroamphet,,
23353754,drug ineffective,6.0,7,5.0,dextroamphet,,
23353755,drug diversion,6.0,7,5.0,oxycodone,,
23353756,drug diversion,6.0,7,5.0,oxycodone,,


In [0]:
fact_reaction.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- patient_reaction: string (nullable = true)
 |-- reaction_outcome: string (nullable = true)
 |-- reaction_severity: integer (nullable = false)
 |-- action_taken: string (nullable = true)
 |-- medicinal_product: string (nullable = true)
 |-- start_dosage_date: date (nullable = true)
 |-- end_dosage_date: date (nullable = true)



In [0]:
print(latest_file)
year = (latest_file.split("_")[4]).split('.')[0].split('-')[0]
print(year)

dbfs:/mnt/adverse-events/raw-data/fda_adverse_events_extract_2024-01-01-TO-2024-12-31.json
2024


In [0]:

fact_reaction.repartition(1).write.mode("overwrite").option("header", "true").csv(f"mnt/adverse-events/transformed-data/fact_reaction_{year}.csv")
dim_patient.repartition(1).write.mode("overwrite").option("header", "true").csv(f"mnt/adverse-events/transformed-data/dim_patient_{year}.csv")
dim_drug_information.repartition(1).write.mode("overwrite").option("header", "true").csv(f"mnt/adverse-events/transformed-data/dim_drug_information_{year}.csv")
dim_report_metadata.repartition(1).write.mode("overwrite").option("header", "true").csv(f"mnt/adverse-events/transformed-data/dim_report_metadata_{year}.csv")