The goal of this notebook is to go through the process of mapping EMR data to telemetry tables.

In [1]:
from pyspark.sql.functions import monotonically_increasing_id,rand,col,round,to_timestamp,date_sub,expr,date_format,lit

First I load the PatientMap, this is the dataset that maps MRN to CSID.

In [2]:
dfPatients = spark.read.parquet("/fs/ess/scratch/PAS2164/CarescapeComb/Temp2/PatientMap.parquet")

Then I load all of the different EMR tables.

In [3]:
dfAlg=spark.read.format("csv").option("header","true").load("/fs/ess/scratch/PAS2164/CarescapeData/Allergy.csv")
dfDem=spark.read.format("csv").option("header","true").load("/fs/ess/scratch/PAS2164/CarescapeData/Demographics.csv")
dfFlo=spark.read.format("csv").option("header","true").load("/fs/ess/scratch/PAS2164/CarescapeData/Flowsheets.csv")
dfICU=spark.read.format("csv").option("header","true").load("/fs/ess/scratch/PAS2164/CarescapeData/ICU.csv")
dfLab=spark.read.format("csv").option("header","true").load("/fs/ess/scratch/PAS2164/CarescapeData/Lab_Orders.csv")
dfLoc=spark.read.format("csv").option("header","true").load("/fs/ess/scratch/PAS2164/CarescapeData/Location.csv")
dfMed=spark.read.format("csv").option("header","true").load("/fs/ess/scratch/PAS2164/CarescapeData/Medication_Orders.csv")
dfOrd=spark.read.format("csv").option("header","true").load("/fs/ess/scratch/PAS2164/CarescapeData/Orders.csv")
dfPat=spark.read.format("csv").option("header","true").load("/fs/ess/scratch/PAS2164/CarescapeData/Patient.csv")
dfSHx=spark.read.format("csv").option("header","true").load("/fs/ess/scratch/PAS2164/CarescapeData/Social_Hx.csv")



I need to mask CSN and to do that I need to get a list of all CSNs.

In [4]:
from functools import reduce
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
#Get all CSNs
dfAlgCSN=dfAlg.select(col('PAT_ENC_CSN_ID'))
dfDemCSN=dfDem.select(col('PAT_ENC_CSN_ID'))
dfFloCSN=dfFlo.select(col('PAT_ENC_CSN_ID'))
dfICUCSN=dfICU.select(col('PAT_ENC_CSN_ID'))
dfLocCSN=dfLoc.select(col('PAT_ENC_CSN_ID'))
dfMedCSN=dfMed.select(col('PAT_ENC_CSN_ID'))
dfPatCSN=dfPat.select(col('PAT_ENC_CSN_ID'))
dfOrdCSN1=dfOrd.select(col("Resulting_CSN").alias("PAT_ENC_CSN_ID"))
dfOrdCSN2=dfOrd.select(col("Ordering_CSN").alias("PAT_ENC_CSN_ID"))
dfLabCSN1=dfLab.select(col("Resulting_CSN").alias("PAT_ENC_CSN_ID"))
dfLabCSN2=dfLab.select(col("Ordering_CSN").alias("PAT_ENC_CSN_ID"))

#Combine them all into 1 dataframe and get the unique elements.
dfs = [dfAlgCSN,dfDemCSN,dfFloCSN,dfICUCSN,dfLocCSN,dfMedCSN,dfPatCSN,dfOrdCSN1,dfOrdCSN2,dfLabCSN1,dfLabCSN2]
dfCSNall = reduce(DataFrame.union, dfs)
dfCSN=dfCSNall.distinct()
dfCSN = dfCSN.select("*").orderBy(F.rand())
dfCSN=dfCSN.withColumn("CS_PAT_ENC_CSN_ID", monotonically_increasing_id())

I create a table that maps The Telemetry Patient Map to the EMR Patient Map on MRN, and then use a PAT_ID column to map to joi with each of the other tables. 

In [24]:
keyConditions=[]
keyConditions.append(dfPatients["patientIdPrimary-id"] == dfPat["PAT_MRN_ID"])
dfOut=dfPatients.join(dfPat,keyConditions,"inner")

dfODist=dfOut["patientIdPrimary-id", "patientIdPrimary-type","PAT_ID","Offset","csID"].distinct()

keyConditions=[]
keyConditions.append(dfODist["PAT_ID"] == dfAlg["PAT_ID"])
dfAlgO1=dfODist.join(dfAlg,keyConditions,"inner")


keyConditions=[]
keyConditions.append(dfCSN["PAT_ENC_CSN_ID"] == dfAlg["PAT_ENC_CSN_ID"])
dfAlgO=dfAlgO1.join(dfCSN,keyConditions,"inner")
dfAlgO=dfAlgO.drop(dfAlg.PAT_ENC_CSN_ID)

keyConditions=[]
keyConditions.append(dfODist["PAT_ID"] == dfDem["PAT_ID"])
dfDemO1=dfODist.join(dfDem,keyConditions,"inner")

keyConditions=[]
keyConditions.append(dfCSN["PAT_ENC_CSN_ID"] == dfDem["PAT_ENC_CSN_ID"])
dfDemO=dfDemO1.join(dfCSN,keyConditions,"inner")
dfDemO=dfDemO.drop(dfDem.PAT_ENC_CSN_ID)

keyConditions=[]
keyConditions.append(dfODist["PAT_ID"] == dfFlo["PAT_ID"])
dfFloO1=dfODist.join(dfFlo,keyConditions,"inner")

keyConditions=[]
keyConditions.append(dfCSN["PAT_ENC_CSN_ID"] == dfFlo["PAT_ENC_CSN_ID"])
dfFloO=dfFloO1.join(dfCSN,keyConditions,"inner")
dfFloO=dfFloO.drop(dfFlo.PAT_ENC_CSN_ID)

keyConditions=[]
keyConditions.append(dfOut["PAT_ENC_CSN_ID"] == dfICU["PAT_ENC_CSN_ID"])
dfICUO1=dfOut.join(dfICU,keyConditions,"inner")

keyConditions=[]
keyConditions.append(dfCSN["PAT_ENC_CSN_ID"] == dfICU["PAT_ENC_CSN_ID"])
dfICUO=dfICUO1.join(dfCSN,keyConditions,"inner")
dfICUO=dfICUO.drop(dfICU.PAT_ENC_CSN_ID)

keyConditions=[]
keyConditions.append(dfOut["PAT_ENC_CSN_ID"] == dfLoc["PAT_ENC_CSN_ID"])
dfLocO1=dfOut.join(dfLoc,keyConditions,"inner")

keyConditions=[]
keyConditions.append(dfCSN["PAT_ENC_CSN_ID"] == dfLoc["PAT_ENC_CSN_ID"])
dfLocO=dfLocO1.join(dfCSN,keyConditions,"inner")
dfLocO=dfLocO.drop(dfLoc.PAT_ENC_CSN_ID)

keyConditions=[]
keyConditions.append(dfODist["PAT_ID"] == dfMed["PAT_ID"])
dfMedO1=dfODist.join(dfMed,keyConditions,"inner")

keyConditions=[]
keyConditions.append(dfCSN["PAT_ENC_CSN_ID"] == dfMed["PAT_ENC_CSN_ID"])
dfMedO=dfMedO1.join(dfCSN,keyConditions,"inner")
dfMedO=dfMedO.drop(dfMed.PAT_ENC_CSN_ID)

keyConditions=[]
keyConditions.append(dfODist["PAT_ID"] == dfSHx["PAT_ID"])
dfSHxO=dfODist.join(dfSHx,keyConditions,"inner")


keyConditions=[]
keyConditions.append(dfODist["PAT_ID"] == dfLab["PAT_ID"])
dfLabO1=dfODist.join(dfLab,keyConditions,"inner")


keyConditions=[]
keyConditions.append(dfCSN["PAT_ENC_CSN_ID"] == dfLab["Ordering_CSN"])
dfLabO2=dfLabO1.join(dfCSN,keyConditions,"inner")
dfLabO2=dfLabO2.drop(dfLab.Ordering_CSN)
dfLabO2=dfLabO2.withColumn("Ordering_CSN", dfCSN.CS_PAT_ENC_CSN_ID)
dfLabO2=dfLabO2.drop(dfCSN.PAT_ENC_CSN_ID)
dfLabO2=dfLabO2.drop(dfCSN.CS_PAT_ENC_CSN_ID)

keyConditions=[]
keyConditions.append(dfCSN["PAT_ENC_CSN_ID"] == dfLab["Resulting_CSN"])
dfLabO=dfLabO2.join(dfCSN,keyConditions,"inner")
dfLabO=dfLabO.drop(dfLab.Resulting_CSN)
dfLabO=dfLabO.withColumn("Resulting_CSN", dfCSN.CS_PAT_ENC_CSN_ID)
dfLabO=dfLabO.drop(dfCSN.PAT_ENC_CSN_ID)
dfLabO=dfLabO.drop(dfCSN.CS_PAT_ENC_CSN_ID)

keyConditions=[]
keyConditions.append(dfODist["PAT_ID"] == dfOrd["PAT_ID"])
dfOrdO1=dfODist.join(dfOrd,keyConditions,"inner")

keyConditions=[]
keyConditions.append(dfCSN["PAT_ENC_CSN_ID"] == dfOrd["Ordering_CSN"])
dfOrdO2=dfOrdO1.join(dfCSN,keyConditions,"inner")
dfOrdO2=dfOrdO2.drop(dfOrd.Ordering_CSN)
dfOrdO2=dfOrdO2.withColumn("Ordering_CSN", dfCSN.CS_PAT_ENC_CSN_ID)
dfOrdO2=dfOrdO2.drop(dfCSN.PAT_ENC_CSN_ID)
dfOrdO2=dfOrdO2.drop(dfCSN.CS_PAT_ENC_CSN_ID)

keyConditions=[]
keyConditions.append(dfCSN["PAT_ENC_CSN_ID"] == dfOrd["Resulting_CSN"])
dfOrdO=dfOrdO2.join(dfCSN,keyConditions,"inner")
dfOrdO=dfOrdO.drop(dfOrd.Resulting_CSN)
dfOrdO=dfOrdO.withColumn("Resulting_CSN", dfCSN.CS_PAT_ENC_CSN_ID)
dfOrdO=dfOrdO.drop(dfCSN.PAT_ENC_CSN_ID)
dfOrdO=dfOrdO.drop(dfCSN.CS_PAT_ENC_CSN_ID)

keyConditions=[]
keyConditions.append(dfCSN["PAT_ENC_CSN_ID"] == dfOut["PAT_ENC_CSN_ID"])
dfOut=dfOut.join(dfCSN,keyConditions,"inner")



Then for each of those tables, I offset all dates and remove identifiers. 

In [25]:
AlgOut=dfAlgO['csID','Allergy','Allergy_Reaction','CS_PAT_ENC_CSN_ID']
DemOut=dfDemO["csID","Gender","Age","Marital_Status","Patient_Race","Ethnicity","Preferred_Language","ABO_RH_Blood_Type",'CS_PAT_ENC_CSN_ID']
dfFloO=dfFloO.withColumn("polltimestamp",to_timestamp("RECORDED_TIME"))
dfFloO=dfFloO.withColumn("offsetDate",expr("date_sub(polltimestamp,Offset)"))
dfFloO=dfFloO.withColumn('offsetTime', date_format('polltimestamp', 'HH:mm:ss'))
FloOut=dfFloO["csID","FLO_MEAS_ID","flo_meas_name","disp_name","MEAS_VALUE","MEAS_COMMENT","offsetDate","offsetTime","fsd_id","Flow_Type",'CS_PAT_ENC_CSN_ID']
dfICUO=dfICUO.withColumn("polltimestampSTART",to_timestamp("Start_Time"))
dfICUO=dfICUO.withColumn("offsetStartDate",expr("date_sub(polltimestampSTART,Offset)"))
dfICUO=dfICUO.withColumn('offsetStartTime', date_format('polltimestampSTART', 'HH:mm:ss'))
dfICUO=dfICUO.withColumn("polltimestampEND",to_timestamp("End_Time"))
dfICUO=dfICUO.withColumn("offsetEndDate",expr("date_sub(polltimestampEND,Offset)"))
dfICUO=dfICUO.withColumn('offsetEndTime', date_format('polltimestampEND', 'HH:mm:ss'))
ICUOut=dfICUO["csID","event_id","IN_Event_Type","DEPARTMENT_ID","DEPARTMENT_NAME","offsetStartDate","offsetStartTime","offsetEndDate","offsetEndTime","OUT_Event_Type","LOS",'CS_PAT_ENC_CSN_ID']
dfLocO=dfLocO.withColumn("polltimestampSTART",to_timestamp("Start_Time"))
dfLocO=dfLocO.withColumn("offsetStartDate",expr("date_sub(polltimestampSTART,Offset)"))
dfLocO=dfLocO.withColumn('offsetStartTime', date_format('polltimestampSTART', 'HH:mm:ss'))
dfLocO=dfLocO.withColumn("polltimestampEND",to_timestamp("End_Time"))
dfLocO=dfLocO.withColumn("offsetEndDate",expr("date_sub(polltimestampEND,Offset)"))
dfLocO=dfLocO.withColumn('offsetEndTime', date_format('polltimestampEND', 'HH:mm:ss'))
LocOut=dfLocO["csID","event_id","IN_Event_Type","DEPARTMENT_ID","DEPARTMENT_NAME","BED_ID","ROOM_ID","offsetStartDate","offsetStartTime","offsetEndDate","offsetEndTime","OUT_Event_Type","LOS",'CS_PAT_ENC_CSN_ID']
dfMedO=dfMedO.withColumn("polltimestampSTART",to_timestamp("START_DATE"))
dfMedO=dfMedO.withColumn("offsetStartDate",expr("date_sub(polltimestampSTART,Offset)"))
dfMedO=dfMedO.withColumn('offsetStartTime', date_format('polltimestampSTART', 'HH:mm:ss'))
dfMedO=dfMedO.withColumn("polltimestampEND",to_timestamp("END_DATE"))
dfMedO=dfMedO.withColumn("offsetEndDate",expr("date_sub(polltimestampEND,Offset)"))
dfMedO=dfMedO.withColumn('offsetEndTime', date_format('polltimestampEND', 'HH:mm:ss'))
dfMedO=dfMedO.withColumn("polltimestampOrder",to_timestamp("ORDERING_DATE"))
dfMedO=dfMedO.withColumn("offsetOrderingDate",expr("date_sub(polltimestampOrder,Offset)"))
MedOut=dfMedO["csID","MEDICATION","offsetOrderingDate","offsetStartDate","offsetStartTime","offsetEndDate","offsetEndTime","ORDERING_MODE","HV_DISCRETE_DOSE","HV_DISCRET_DOSE_UNIT","FREQUENCY","MAR_DOSE","MAR_DOSE_UNIT","MAR_TAKEN_TIME","MAR_ACTION","MAR_ROUTE",'CS_PAT_ENC_CSN_ID']
dfSHxO=dfSHxO.withColumn("polltimestampSTART",to_timestamp("SMOKING_QUIT_DATE"))
dfSHxO=dfSHxO.withColumn("offsetSMOKING_QUIT_DATE",expr("date_sub(polltimestampSTART,Offset)"))
dfSHxO=dfSHxO.withColumn("polltimestampEND",to_timestamp("SMOKELESS_QUIT_DATE"))
dfSHxO=dfSHxO.withColumn("offsetSMOKELESS_QUIT_DATE",expr("date_sub(polltimestampEND,Offset)"))
SHxOut=dfSHxO["csID","TOBACCO_USER_C","SMOKING_TOB_USE_C","offsetSMOKING_QUIT_DATE","offsetSMOKELESS_QUIT_DATE","CIGARETTES_YN","PIPES_YN","CIGARS_YN","TOBACCO_PAK_PER_DY","TOBACCO_USED_YEARS","TOBACCO_COMMENT","SMOKELESS_TOB_USE","SNUFF_YN","CHEW_YN","SMOKELESS_QUIT_DATE","ALCOHOL_USE_C","ALCOHOL_SRC_C","ALCOHOL_FREQ_C","ALCOHOL_DRINKS_PER_DAY_C","ALCOHOL_BINGE_C","ALCOHOL_OZ_PER_WK","ALCOHOL_COMMENT","DRUG_SRC_C","IV_DRUG_USER_YN","ILL_DRUG_USER_C","ILLICIT_DRUG_FREQ","ILLICIT_DRUG_CMT"]
dfLabO=dfLabO.withColumn("polltimestampSTART",to_timestamp("ORDERING_DATE"))
dfLabO=dfLabO.withColumn("offsetOrderDate",expr("date_sub(polltimestampSTART,Offset)"))
dfLabO=dfLabO.withColumn('offsetOrderTime', date_format('polltimestampSTART', 'HH:mm:ss'))
dfLabO=dfLabO.withColumn("polltimestampEND",to_timestamp("RESULT_DATE"))
dfLabO=dfLabO.withColumn("offsetResultDate",expr("date_sub(polltimestampEND,Offset)"))
dfLabO=dfLabO.withColumn('offsetResultTime', date_format('polltimestampEND', 'HH:mm:ss'))
LabOut=dfLabO["csID","Order_type","DESCRIPTION","PROC_ID","COMPONENT_NAME","ORDER_STATUS_C","order_status","offsetOrderDate","offsetOrderTime","offsetResultDate","offsetResultTime","ORD_NUM_VALUE","ORD_VALUE","COMPONENT_COMMENT",'Ordering_CSN','Resulting_CSN']
dfOrdO=dfOrdO.withColumn("polltimestampSTART",to_timestamp("ORDERING_DATE"))
dfOrdO=dfOrdO.withColumn("offsetOrderDate",expr("date_sub(polltimestampSTART,Offset)"))
dfOrdO=dfOrdO.withColumn('offsetOrderTime', date_format('polltimestampSTART', 'HH:mm:ss'))
dfOrdO=dfOrdO.withColumn("polltimestampEND",to_timestamp("RESULT_DATE"))
dfOrdO=dfOrdO.withColumn("offsetResultDate",expr("date_sub(polltimestampEND,Offset)"))
dfOrdO=dfOrdO.withColumn('offsetResultTime', date_format('polltimestampEND', 'HH:mm:ss'))
OrdOut=dfOrdO["csID","Order_type","DESCRIPTION","PROC_ID","ORDER_STATUS_C","order_status","offsetOrderDate","offsetOrderTime","offsetResultDate","offsetResultTime","ORD_NUM_VALUE","ORD_VALUE","COMPONENT_COMMENT",'Ordering_CSN','Resulting_CSN']
dfOut=dfOut.withColumn("polltimestampSTART",to_timestamp("HOSP_ADMSN_TIME"))
dfOut=dfOut.withColumn("offsetHospAdmsnDate",expr("date_sub(polltimestampSTART,Offset)"))
dfOut=dfOut.withColumn('offsetHospAdmsnTime', date_format('polltimestampSTART', 'HH:mm:ss'))
dfOut=dfOut.withColumn("polltimestampEND",to_timestamp("HOSP_DISCH_TIME"))
dfOut=dfOut.withColumn("offsetHospDischDate",expr("date_sub(polltimestampEND,Offset)"))
dfOut=dfOut.withColumn('offsetHospDischTime', date_format('polltimestampEND', 'HH:mm:ss'))
PatOut=dfOut["csID","Admission_Type","Admit_Source","LOS_Hours","REASON_VISIT_NAME","offsetHospAdmsnDate","offsetHospAdmsnTime","offsetHospDischDate","offsetHospDischTime","Discharge_Destination","Primary_ICD10","Primary_Diagnosis","Secondary_ICD10","Secondary_Diagnosis",'CS_PAT_ENC_CSN_ID']

With the datasets created and coded I then write each of the EMR csv files to file. 

In [26]:
PatOut.write.option("mergeSchema", "true").mode("overwrite").parquet('/fs/ess/scratch/PAS2164/Patient.parquet')
AlgOut.write.option("mergeSchema", "true").mode("overwrite").parquet('/fs/ess/scratch/PAS2164/Allergies.parquet')
DemOut.write.option("mergeSchema", "true").mode("overwrite").parquet('/fs/ess/scratch/PAS2164/Demographics.parquet')
FloOut.write.option("mergeSchema", "true").mode("overwrite").parquet('/fs/ess/scratch/PAS2164/Flowsheet.parquet')
ICUOut.write.option("mergeSchema", "true").mode("overwrite").parquet('/fs/ess/scratch/PAS2164/ICU.parquet')
LocOut.write.option("mergeSchema", "true").mode("overwrite").parquet('/fs/ess/scratch/PAS2164/Location.parquet')
MedOut.write.option("mergeSchema", "true").mode("overwrite").parquet('/fs/ess/scratch/PAS2164/Medications.parquet')
SHxOut.write.option("mergeSchema", "true").mode("overwrite").parquet('/fs/ess/scratch/PAS2164/SocialHx.parquet')
LabOut.write.option("mergeSchema", "true").mode("overwrite").parquet('/fs/ess/scratch/PAS2164/Labs.parquet')
OrdOut.write.option("mergeSchema", "true").mode("overwrite").parquet('/fs/ess/scratch/PAS2164/Orders.parquet')