In [0]:
# Set the AWS credentials using the secrets
access_key = dbutils.secrets.get(scope="aws-dbricks", key="aws_access_key_id")
secret_key = dbutils.secrets.get(scope="aws-dbricks", key="aws_secret_access_key")

# Encode the credentials in Base64
encoded_secret = f"{access_key}:{secret_key}"
encoded_secret = encoded_secret.encode("utf-8").hex()

# Mount the S3 bucket
dbutils.fs.mount(
  source = "s3a://***",
  mount_point = "/mnt/***",
  extra_configs = {"fs.s3a.access.key": access_key, "fs.s3a.secret.key": secret_key}
)

# List the files in the S3 bucket

dbutils.fs.ls("/mnt/***")

In [0]:

from pyspark.sql.functions import col,isnan, when, count

# checking null values after reading CSV file in dataframe

df_procedures_icd_raw = spark.read.csv(f"/mnt/****/PROCEDURES_ICD.csv", header='true')

#df_procedures_icd_raw.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in raw_df_procedures_icd_raw.columns] ).show()

df_procedures_code_raw = spark.read.format("com.crealytics.spark.excel") \
    .option("location", "dbfs:/mnt/****/CMS32_DESC_LONG_SHORT_SG.xlsx") \
    .option("header", "true") \
    .option("treatEmptyValuesAsNulls", "false") \
    .option("inferSchema", "false") \
    .option("addColorColumns", "false") \
    .load("/mnt/****/CMS32_DESC_LONG_SHORT_SG.xlsx")

df_procedures_code_raw = df_procedures_code_raw.drop("SHORT DESCRIPTION", "_c3" )

df_procedures_code_raw = df_procedures_code_raw.withColumnRenamed("PROCEDURE CODE", "ICD9_CODE").withColumnRenamed("LONG DESCRIPTION", "preventive_measurement") 

df_procedures_code_raw2 = df_procedures_code_raw.withColumnRenamed("preventive_measurement", "diagnosed_condition") 

df_procedures_code_processed = df_procedures_icd_raw.join(df_procedures_code_raw,['ICD9_CODE'],how='inner')

df_procedures_code_processed = df_procedures_code_processed.toDF(*[c.lower() for c in df_procedures_code_processed.columns])

df_diagnoses_icd_raw = spark.read.csv(f"/mnt/****/DIAGNOSES_ICD.csv", header='true')

df_diagnoses_icd_processed = df_diagnoses_icd_raw.join(df_procedures_code_raw2,['ICD9_CODE'],how='inner')

df_diagnoses_icd_processed = df_diagnoses_icd_processed.toDF(*[c.lower() for c in df_diagnoses_icd_processed.columns])

df_prescriptions_raw = spark.read.csv(f"/mnt/****/PRESCRIPTIONS.csv", header='true')

df_prescriptions_processed = df_prescriptions_raw.drop("icustay_id") # Since icustay can be calculated from startdate and enddate columns and most of the rows are null, its better to drop this column

merged_df_raw = df_diagnoses_icd_processed.join(df_procedures_code_processed,on='subject_id', how='outer')\
   .join(df_prescriptions_processed,on='subject_id', how='outer')

df1 = merged_df_raw.dropDuplicates(['subject_id', 'hadm_id'])

df1 = df1.drop("icd9_code", "row_id", "seq_num", "hadm_id" )

df2 = df1.na.drop(subset=["startdate","enddate"])

display(df2)


subject_id,diagnosed_condition,preventive_measurement,startdate,enddate,drug_type,drug,drug_name_poe,drug_name_generic,formulary_drug_cd,gsn,ndc,prod_strength,dose_val_rx,dose_unit_rx,form_val_disp,form_unit_disp,route
10017,"Arthroscopy, foot and toe",Partial shoulder replacement,2149-05-26 00:00:00,2149-05-26 00:00:00,MAIN,Hydromorphone,Hydromorphone,Hydromorphone,HYDR2I,4103.0,74131230,2mg/mL Syringe,2-6,mg,1-3,SYR,SC
10027,"Esophagectomy, not otherwise specified",Open and other replacement of mitral valve with tissue graft,2190-07-20 00:00:00,2190-07-25 00:00:00,MAIN,Potassium Chloride,Potassium Chloride,Potassium Chloride,KCL20P,1262.0,456066270,20mEq Packet,20,mEq,1,PKT,PO
10029,Other diagnostic procedures on biliary tract,Infusion of vasopressor agent,2139-09-23 00:00:00,2139-09-23 00:00:00,BASE,NS,,,NS1000,1210.0,338004904,1000mL Bag,1000,ml,1,BAG,IV
10032,Other repair of urethra,Transposition of cranial and peripheral nerves,2138-04-02 00:00:00,2138-04-15 00:00:00,MAIN,Prochlorperazine,Prochlorperazine,Prochlorperazine Maleate,PROC10,3846.0,7336721,10MG TAB,10,mg,1,TAB,PO
10038,Other repair of urethra,"Venous catheterization, not elsewhere classified",2144-02-10 00:00:00,2144-02-11 00:00:00,MAIN,Lorazepam,Lorazepam,Lorazepam,LORA2I,3753.0,74198530,2mg/mL Syringe,0.5,mg,0.25,ml,IV
10059,Other repair of urethra,Arterial catheterization,2150-08-23 00:00:00,2150-08-23 00:00:00,MAIN,Erythromycin,,,ERY1I,9251.0,74647844,1000 mg Vial,250,mg,250,mg,IV
10059,Other repair of urethra,Arterial catheterization,2150-08-07 00:00:00,2150-08-13 00:00:00,MAIN,Sodium Chloride 0.9% Flush,Sodium Chloride 0.9% Flush,Sodium Chloride 0.9% Flush,NACLFLUSH,,0,Syringe,3,ml,0.6,SYR,IV
10059,Other repair of urethra,Endoscopic excision or destruction of lesion or tissue of esophagus,2150-08-07 00:00:00,2150-08-13 00:00:00,MAIN,Sodium Chloride 0.9% Flush,Sodium Chloride 0.9% Flush,Sodium Chloride 0.9% Flush,NACLFLUSH,,0,Syringe,3,ml,0.6,SYR,IV
10059,Other repair of urethra,Arterial catheterization,2150-08-23 00:00:00,2150-08-23 00:00:00,MAIN,Erythromycin,,,ERY1I,9251.0,74647844,1000 mg Vial,250,mg,250,mg,IV
10059,Other repair of urethra,Arterial catheterization,2150-08-07 00:00:00,2150-08-13 00:00:00,MAIN,Sodium Chloride 0.9% Flush,Sodium Chloride 0.9% Flush,Sodium Chloride 0.9% Flush,NACLFLUSH,,0,Syringe,3,ml,0.6,SYR,IV
