In [None]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import col, datediff, lit
from pyspark.sql.types import *
from os import walk
import json
import os
import sys
from Helper_Code.helpfull_functions import *
from Helper_Code.helper_variables import *
from Helper_Code.quality_checks import *


In [None]:
# get_spark_session is a helper function that will load the session with the appropriate configs
spark = get_spark_session("test", SparkConf())

In [None]:
# merge_files is a potentially obsolete function that groups all the parquet files in a folder, can select for showing the resulting dataframe or not
concept = merge_files("/home/jupyter/omop-ed-datapipeline/concept", spark, show = False)

In [None]:
# Loads visit occurences
visit_occurrence = merge_files("/home/jupyter/omop-ed-datapipeline/visit_occurrence", spark, show = False)

#Selects for only the columns related to the times of the visit
visit_length_columns = ['person_id', 'visit_occurrence_id', 'visit_start_datetime', 'visit_end_datetime']
visit_length = visit_occurrence.select([col for col in visit_length_columns])

#Creates a new column of the length of the visits in seconds
visit_length=visit_length.withColumn('visit_length',col("visit_end_datetime").cast("long") - col('visit_start_datetime').cast("long"))
visit_length.show()

In [None]:
payer_plan_period = merge_files("/home/jupyter/co-morbidity-omop/payer_plan_period", spark, show= True)


In [None]:
# Person file is mostly demographic info
person = merge_files("/home/jupyter/omop-ed-datapipeline/person", spark, show = False)

#Selects the columns that we want
demographics_cols = ['person_id', 'birth_datetime', 'death_datetime', 'gender_source_value', 'race_source_value', 'ethnicity_source_value']
demo = person.select([col for col in demographics_cols])

#In order to get patient age we need to compair birth date and visit time so we first merge with visit information. Can alternatively merge on visit level instead of person level
demo = demo.join(visit_length, on = "person_id", how = "outer")

#Create a new age column in years
demo = demo.withColumn("age", datediff(col("visit_start_datetime"),col("birth_datetime"))/365.25)

#Gets rid of visit level so only demographics are left
demo = demo.select([col for col in ['person_id', 'birth_datetime', 'death_datetime', 'gender_source_value', 'race_source_value', 'ethnicity_source_value', 'age']])

#Selects for over 65
demo = demo.where((demo.age >= 65))

#Gets only unique rows as there will be multiple encounters per patient and when you get rid of the visit level columns they are identical.
demo = demo.distinct()
demo.show()

In [None]:
#Save in a csv

#I think it is possible to save in one CSV instead of chunks. Pyspark is set up where each dataframe is chunked out so they can be passed to parallel cores for more efficient work.
#I haven't looked into it but I think theres a way to manualey specify the number of these chunks. I believe it saves a csv per chunk so you can manually set it to one chunk right
#before saving and it should save as one CSV
#... maybe
demo.write.option("header",True).csv("democsv")

In [None]:
## Condition file contains all the diagnoses a patient might have in the form of ICD-9/10, or SNOWMED codes. 
condition_occurrence = merge_files("/home/jupyter/omop-ed-datapipeline/condition_occurrence", spark, show = False)

#Select columns
condition_columns = ['person_id', 'condition_occurrence_id', 'condition_occurrence_source_id', 'condition_concept_id', 'condition_start_datetime', 'condition_end_datetime', 'condition_type_concept_id', 'condition_status_concept_id', 'visit_occurrence_id', 'condition_source_value', 'condition_source_concept_id']
conditions = condition_occurrence.select([col for col in condition_columns])

#Merge the concepts dataframe with the conditions to get more information about the condition_concept_id
#And column with the concept_id at the end preceded by something can be compared to the concept.concept_id column
conditions = conditions.join(concept, conditions.condition_concept_id == concept.concept_id)

#Get only the columns we want
merged_columns = ['person_id', 'condition_concept_id', 'condition_start_datetime', 'condition_end_datetime', 'visit_occurrence_id', 'condition_source_value', 'concept_name', 'domain_id', 'vocabulary_id', 'concept_class_id', 'concept_code']
conditions = conditions.select([col for col in merged_columns])

#Format the visit length to what we currently need and merge it on visit level with the conditions
visit_length_columns = ['visit_occurrence_id', 'visit_start_datetime', 'visit_end_datetime']
visit_length2 = visit_length.select([col for col in visit_length_columns])
conditions = conditions.join(visit_length2, on = "visit_occurrence_id", how = "outer")

#Selects the conditions that started before the visit startdate and haven't ended by the time the visit started. This is in affect the current conditions of the patient
conditions = conditions.where((conditions.condition_start_datetime < conditions.visit_start_datetime) & (conditions.condition_end_datetime > conditions.visit_start_datetime))

#Trim the columns
merged_columns = ['person_id', 'visit_occurrence_id', 'condition_concept_id', 'condition_source_value', 'concept_name', 'domain_id', 'vocabulary_id', 'concept_class_id', 'concept_code']
conditions = conditions.select([col for col in merged_columns])

#This groups the conditions by visit. So instead of one row per condition its one row per encounter. I don't know why its called "F" but for some reason thats an important thing.
# There are various F.collect_XXXX functions. collect_list groups all the items into a list, but there are ones for average, mean, sum, and things like that too
conditions = conditions.groupby("visit_occurrence_id", "person_id").agg(F.collect_list("condition_concept_id"), F.collect_list("condition_source_value"), F.collect_list("concept_name"), F.collect_list("domain_id"), F.collect_list("vocabulary_id"), F.collect_list("concept_class_id"), F.collect_list("concept_code"))

#Renames the columns back to a understanable name cause its annoying to have all the columns called collect_list(XXXX)
conditions = conditions.withColumnRenamed("collect_list(condition_concept_id)","condition_concept_id")\
                                .withColumnRenamed("collect_list(condition_source_value)","condition_source_value")\
                                .withColumnRenamed("collect_list(concept_name)","concept_name")\
                                .withColumnRenamed("collect_list(domain_id)","domain_id")\
                                .withColumnRenamed("collect_list(vocabulary_id)","vocabulary_id")\
                                .withColumnRenamed("collect_list(concept_class_id)","concept_class_id")\
                                .withColumnRenamed("collect_list(concept_code)","concept_code")
# conditions.show()

In [None]:
#This is some hacky stuff.
#CSVs cant save cells with list objects in them so for all the columns that contain lists we have to cast them to strings
columns_to_string = ['condition_concept_id', 'condition_source_value', 'concept_name', 'domain_id', 'vocabulary_id', 'concept_class_id', 'concept_code']
for c in columns_to_string:
    conditions = conditions.withColumn(c, col(c).cast('string'))

In [None]:
conditions.write.option("header",True).csv("conditions")

In [None]:
#All this is super similar to conditions so im just gonna point out the differences

procedure = merge_files("/home/jupyter/omop-ed-datapipeline/procedure_occurrence", spark, show = False)

pocedure_columns = ['person_id', 'procedure_concept_id', 'procedure_datetime', 'visit_occurrence_id', 'procedure_source_value']
procedure_occurrence = procedure.select([col for col in pocedure_columns])

procedure_occurrence = procedure_occurrence.join(concept, procedure_occurrence.procedure_concept_id == concept.concept_id)

merged_columns = ['person_id', 'procedure_datetime', 'visit_occurrence_id', 'procedure_source_value', 'concept_name', 'vocabulary_id']
procedure_occurrence = procedure_occurrence.select([col for col in merged_columns])

procedure_occurrence = procedure_occurrence.join(visit_length, on = ["visit_occurrence_id", "person_id"], how = "outer")

#Procedures are mostly denoted by CPT codes so here one of the columns is vocabulary_id which specifies which language the code is from (ICD, SNOMED, CPT, RxNORM etc)
#And we select for CPT
procedure_occurrence = procedure_occurrence.where((procedure_occurrence.vocabulary_id == 'CPT4'))

merged_columns = ['person_id', 'visit_occurrence_id', 'procedure_datetime', 'procedure_source_value', 'concept_name', 'vocabulary_id']
procedure_occurrence = procedure_occurrence.select([col for col in merged_columns])

procedure_occurrence = procedure_occurrence.groupby("visit_occurrence_id", "person_id").agg(F.collect_list("procedure_datetime"), F.collect_list("procedure_source_value"), F.collect_list("concept_name"), F.collect_list("vocabulary_id"))

procedure_occurrence = procedure_occurrence.withColumnRenamed("collect_list(procedure_datetime)","procedure_datetime")\
                                .withColumnRenamed("collect_list(procedure_source_value)","procedure_source_value")\
                                .withColumnRenamed("collect_list(concept_name)","concept_name_procedure")\
                                .withColumnRenamed("collect_list(vocabulary_id)","vocabulary_id_procedure")
# procedure_occurrence.show()

In [None]:
columns_to_string = [ 'procedure_datetime', 'procedure_source_value', 'concept_name_procedure', 'vocabulary_id_procedure']
for c in columns_to_string:
    procedure_occurrence = procedure_occurrence.withColumn(c, col(c).cast('string'))

In [None]:
procedure_occurrence.write.option("header",True).csv("procedure")

In [None]:
#Same idea as procedures and conditions

drug_exposure = merge_files("/home/jupyter/omop-ed-datapipeline/drug_exposure", spark, show = False)


dug_columns = ['person_id', 'drug_concept_id', 'drug_exposure_start_datetime', 'drug_exposure_end_datetime', 'refills', 'quantity', 'route_concept_id', 'visit_occurrence_id', 'drug_generic_source_value_name', 'route_source_value']
drugs = drug_exposure.select([col for col in dug_columns])

drugs = drugs.join(concept, drugs.drug_concept_id == concept.concept_id)

dug_columns = ['person_id', 'drug_concept_id', 'drug_exposure_start_datetime', 'drug_exposure_end_datetime', 'refills', 'quantity', 'route_concept_id', 'visit_occurrence_id', 'drug_generic_source_value_name', 'route_source_value', 'concept_name', 'vocabulary_id' ]
drugs = drugs.select([col for col in dug_columns])

drugs = drugs.groupby("visit_occurrence_id", "person_id").agg(F.collect_list("drug_concept_id"), F.collect_list("drug_exposure_start_datetime"), F.collect_list("drug_exposure_end_datetime"), F.collect_list("drug_generic_source_value_name"), F.collect_list("vocabulary_id"))

drugs = drugs.withColumnRenamed("collect_list(drug_concept_id)","drug_concept_id")\
                                .withColumnRenamed("collect_list(drug_exposure_start_datetime)","drug_exposure_start_datetime")\
                                .withColumnRenamed("collect_list(drug_exposure_end_datetime)","drug_exposure_end_datetime")\
                                .withColumnRenamed("collect_list(drug_generic_source_value_name)","drug_generic_source_value_name")\
                                .withColumnRenamed("collect_list(vocabulary_id)","vocabulary_id_drug")
# drugs.show()

In [None]:
#The measurment file contains things like GCS, labs and stuff like that
measurement = merge_files("/home/jupyter/omop-ed-datapipeline/measurement", spark, show = False)

measurement_columns = ['person_id', 'measurement_concept_id', 'measurement_datetime', 'order_datetime', 'measurement_time', 'measurement_type_concept_id', 'value_as_number', 'value_as_string', 'visit_occurrence_id', 'measurement_source_value', 'measurement_source_value_alt']
measurement = measurement.select([col for col in measurement_columns])

measurement = measurement.join(concept, measurement.measurement_concept_id == concept.concept_id)

In [None]:
#This was my best bet for pulling triage and GCS data
measurement2= measurement.where((concept.concept_name.contains('triage')) | (concept.concept_name.contains('GCS')))
measurement2.show()

In [None]:
measurement2.write.option("header",True).csv("GCS_Traige")

This is all garbage scratch code of me trying to find trage data and not being able too as well as figuring out some other functionality.

In [None]:
observation = merge_files("/home/jupyter/omop-ed-datapipeline/observation", spark, show= False)
observation_columns = ['person_id', 'observation_id', 'specimen_id', 'observation_concept_id', 'observation_datetime', 'observation_type_concept_id', 'value_as_number', 'value_as_string',  'visit_occurrence_id', 'observation_event_id', 'obs_event_field_concept_id']
observation = observation.select([col for col in observation_columns])
observation = observation.join(concept, observation.observation_concept_id == concept.concept_id)
observation.show()

In [None]:
observation2= observation.where((concept.concept_name.contains('triage')))
observation2.show()

In [None]:
observation2.write.option("header",True).csv("Traige")

In [None]:
columns_to_string = ['drug_concept_id', 'drug_exposure_start_datetime', 'drug_exposure_end_datetime', 'drug_generic_source_value_name', 'vocabulary_id_drug']

for c in columns_to_string:
    drugs = drugs.withColumn(c, col(c).cast('string'))

In [None]:
drugs.write.option("header",True).csv("drugs")

In [None]:
patients = demo.join(conditions, on='person_id', how= "left")
patients2 = patients.join(procedure_occurrence, on=['visit_occurrence_id','person_id'], how= "left")
patients3 = patients2.join(drugs, on=['visit_occurrence_id','person_id'], how= "left")

In [None]:
columns_to_string = ['drug_concept_id', 'drug_exposure_start_datetime', 'drug_exposure_end_datetime', 'drug_generic_source_value_name', 'vocabulary_id_drug', 'procedure_datetime', 'procedure_source_value', 'concept_name_procedure', 'vocabulary_id_procedure', 'condition_concept_id', 'condition_source_value', 'concept_name', 'domain_id', 'vocabulary_id', 'concept_class_id', 'concept_code']

for c in columns_to_string:
    patients3 = patients3.withColumn(c, col(c).cast('string'))

In [None]:
# patients3.write.csv('patients_over_65.csv')
patients3.write.option("header",True).csv("/test.csv")

Prototype function

In [None]:
# THis is a funciton I was working on related to identifying column types for quality checks

def dataframe_column_types(df):
    numerical_types = [LongType(), IntegerType(), FloatType(), DecimalType(), DoubleType(), ShortType()]
    string_types = [StringType()]
    null_types = [NullType()]
    time_types = [TimestampType()]
    num_cols = []
    string_cols = []
    null_cols = []
    time_cols = []
    for col in df.columns:
        dtype_col = df.schema[col].dataType
        if dtype_col in numerical_types:
            print(col, dtype_col, "numerical")
            num_cols.append(col)
        if dtype_col in string_types:
            print(col, dtype_col, "string")
            string_cols.append(col)
        if dtype_col in null_types:
            print(col, dtype_col, "null")
            null_cols.append(col)
        if dtype_col in time_types:
            print(col, dtype_col, "time")
            time_cols.append(col)
    return [num_cols, string_cols, null_cols, time_cols]
        