In [2]:
pip install pyspark==3.5.4

Note: you may need to restart the kernel to use updated packages.


In [1]:
from pyspark.sql.functions import current_timestamp, col
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType
from pyspark.sql import SparkSession
from delta.tables import *
from delta import *
from delta import configure_spark_with_delta_pip

In [2]:
builder = SparkSession \
        .builder \
        .appName('healthcare_ingest')\
        .master('local')\
        .config("spark.driver.memory","2g")\
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        
spark = configure_spark_with_delta_pip(builder).getOrCreate()


In [3]:
spark

In [7]:
landing_folder = "landing_zone"

In [8]:
patients = spark.read \
            .option("inferschema", "True") \
            .option("header", "True") \
            .csv(f"{landing_folder}\\health_records\\patients.csv")

In [9]:
patients.printSchema()

root
 |-- Id: string (nullable = true)
 |-- BIRTHDATE: date (nullable = true)
 |-- DEATHDATE: date (nullable = true)
 |-- PREFIX: string (nullable = true)
 |-- FIRST: string (nullable = true)
 |-- LAST: string (nullable = true)
 |-- SUFFIX: string (nullable = true)
 |-- MAIDEN: string (nullable = true)
 |-- MARITAL: string (nullable = true)
 |-- RACE: string (nullable = true)
 |-- ETHNICITY: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- BIRTHPLACE: string (nullable = true)
 |-- ADDRESS: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTY: string (nullable = true)
 |-- ZIP: integer (nullable = true)
 |-- LAT: double (nullable = true)
 |-- LON: double (nullable = true)



In [39]:
patients.count()

974

In [40]:
patients_renamed = patients.withColumnRenamed("Id", "patients_id")

In [41]:
patients.show()

+--------------------+----------+----------+------+------------+--------------+------+-----------+-------+------+-----------+------+--------------------+--------------------+----------+-------------+----------------+----+------------------+------------+
|                  Id| BIRTHDATE| DEATHDATE|PREFIX|       FIRST|          LAST|SUFFIX|     MAIDEN|MARITAL|  RACE|  ETHNICITY|GENDER|          BIRTHPLACE|             ADDRESS|      CITY|        STATE|          COUNTY| ZIP|               LAT|         LON|
+--------------------+----------+----------+------+------------+--------------+------+-----------+-------+------+-----------+------+--------------------+--------------------+----------+-------------+----------------+----+------------------+------------+
|5605b66b-e92d-c16...|1977-03-19|      NULL|  Mrs.|   Nikita578|     Erdman779|  NULL|  Leannon79|      M| white|nonhispanic|     F|Wakefield  Massac...|510 Little Statio...|    Quincy|Massachusetts|  Norfolk County|2186|42.29093738121128

In [42]:
procedure = spark.read \
            .option("inferschema", "True") \
            .option("header", "True") \
            .csv(f"{landing_folder}\\health_records\\procedures.csv")

In [43]:
procedure.printSchema()

root
 |-- START: timestamp (nullable = true)
 |-- STOP: timestamp (nullable = true)
 |-- PATIENT: string (nullable = true)
 |-- ENCOUNTER: string (nullable = true)
 |-- CODE: long (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- BASE_COST: integer (nullable = true)
 |-- REASONCODE: long (nullable = true)
 |-- REASONDESCRIPTION: string (nullable = true)



In [44]:
procedure_renamed = procedure.withColumnRenamed("PATIENT", "PATIENT_ID") \
                             .withColumnRenamed("ENCOUNTER", "ENCOUNTER_ID")

In [45]:
procedure_renamed.count()

47701

In [46]:
procedure.show()

+-------------------+-------------------+--------------------+--------------------+---------------+--------------------+---------+----------+--------------------+
|              START|               STOP|             PATIENT|           ENCOUNTER|           CODE|         DESCRIPTION|BASE_COST|REASONCODE|   REASONDESCRIPTION|
+-------------------+-------------------+--------------------+--------------------+---------------+--------------------+---------+----------+--------------------+
|2011-01-02 10:26:36|2011-01-02 13:58:36|3de74169-7f67-930...|32c84703-2481-49c...|      265764009|Renal dialysis (p...|      903|      NULL|                NULL|
|2011-01-03 06:44:39|2011-01-03 07:01:42|d9ec2e44-32e9-914...|c98059da-320a-c0a...|       76601001|Intramuscular inj...|     2477|      NULL|                NULL|
|2011-01-04 15:49:55|2011-01-04 16:04:55|d856d6e6-4c98-e7a...|2cfd4ddd-ad13-fe1...|      703423002|Combined chemothe...|    11620| 363406005|Malignant tumor o...|
|2011-01-05 05:02:09|2

In [47]:
encounters = spark.read \
            .option("inferschema", "True") \
            .option("header", "True") \
            .csv(f"{landing_folder}\\health_records\\encounters.csv")

In [48]:
encounters.count()

27891

In [49]:
encounters.show()

+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------+---------+--------------------+-------------------+----------------+--------------+----------+--------------------+
|                  Id|              START|               STOP|             PATIENT|        ORGANIZATION|               PAYER|ENCOUNTERCLASS|     CODE|         DESCRIPTION|BASE_ENCOUNTER_COST|TOTAL_CLAIM_COST|PAYER_COVERAGE|REASONCODE|   REASONDESCRIPTION|
+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------+---------+--------------------+-------------------+----------------+--------------+----------+--------------------+
|32c84703-2481-49c...|2011-01-02 10:26:36|2011-01-02 13:58:36|3de74169-7f67-930...|d78e84ec-30aa-3bb...|b1c428d6-4f07-31e...|    ambulatory|185347001|Encounter for pro...|              85.55|         1018.02|           0.0|      NUL

In [50]:
encounters.printSchema()

root
 |-- Id: string (nullable = true)
 |-- START: timestamp (nullable = true)
 |-- STOP: timestamp (nullable = true)
 |-- PATIENT: string (nullable = true)
 |-- ORGANIZATION: string (nullable = true)
 |-- PAYER: string (nullable = true)
 |-- ENCOUNTERCLASS: string (nullable = true)
 |-- CODE: integer (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- BASE_ENCOUNTER_COST: double (nullable = true)
 |-- TOTAL_CLAIM_COST: double (nullable = true)
 |-- PAYER_COVERAGE: double (nullable = true)
 |-- REASONCODE: long (nullable = true)
 |-- REASONDESCRIPTION: string (nullable = true)



In [51]:
encounters_renamed = encounters.withColumnRenamed("Id", "encounters_id") \
                               .withColumnRenamed("PATIENT", "PATIENT_ID")

In [52]:
payers = spark.read \
            .option("inferschema", "True") \
            .option("header", "True") \
            .csv(f"{landing_folder}\\health_records\\payers.csv")

In [53]:
payers.printSchema()

root
 |-- Id: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- ADDRESS: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE_HEADQUARTERED: string (nullable = true)
 |-- ZIP: integer (nullable = true)
 |-- PHONE: string (nullable = true)



In [54]:
payers_renamed = payers.withColumnRenamed("Id", "payers_id") \
                       .withColumnRenamed("NAME", "PAYER_NAME")

In [55]:
payers_renamed.count()

10

In [56]:
payers_renamed.show()

+--------------------+--------------------+--------------------+------------+-------------------+-----+--------------+
|           payers_id|          PAYER_NAME|             ADDRESS|        CITY|STATE_HEADQUARTERED|  ZIP|         PHONE|
+--------------------+--------------------+--------------------+------------+-------------------+-----+--------------+
|b3221cfc-24fb-339...|       Dual Eligible|  7500 Security Blvd|   Baltimore|                 MD|21244|1-877-267-2323|
|7caa7254-5050-3b5...|            Medicare|  7500 Security Blvd|   Baltimore|                 MD|21244|1-800-633-4227|
|7c4411ce-02f1-39b...|            Medicaid|  7500 Security Blvd|   Baltimore|                 MD|21244|1-877-267-2323|
|d47b3510-2895-3b7...|              Humana|    500 West Main St|  Louisville|                 KY|40018|1-844-330-7799|
|6e2f1a2d-27bd-370...|Blue Cross Blue S...|      Michigan Plaza|     Chicago|                 IL|60007|1-800-262-2583|
|5059a55e-5d6e-34d...|    UnitedHealthcare|9800 

In [57]:
organizations = spark.read \
            .option("inferschema", "True") \
            .option("header", "True") \
            .csv(f"{landing_folder}\\health_records\\organizations.csv")

In [58]:
organizations.printSchema()

root
 |-- Id: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- ADDRESS: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- ZIP: integer (nullable = true)
 |-- LAT: double (nullable = true)
 |-- LON: double (nullable = true)



In [59]:
organizations_renamed = organizations.withColumnRenamed("Id", "organizations_id")

In [60]:
organizations_renamed.count()

1

In [61]:
organizations_renamed.show()

+--------------------+--------------------+---------------+------+-----+----+---------+----------+
|    organizations_id|                NAME|        ADDRESS|  CITY|STATE| ZIP|      LAT|       LON|
+--------------------+--------------------+---------------+------+-----+----+---------+----------+
|d78e84ec-30aa-3bb...|MASSACHUSETTS GEN...|55 FRUIT STREET|BOSTON|   MA|2114|42.362813|-71.069187|
+--------------------+--------------------+---------------+------+-----+----+---------+----------+



In [62]:
description = spark.read \
            .option("inferschema", "True") \
            .option("header", "True") \
            .csv(f"{landing_folder}\\health_records\\data_dictionary.csv")

In [63]:
description.printSchema()

root
 |-- Table: string (nullable = true)
 |-- Field: string (nullable = true)
 |-- Description: string (nullable = true)



In [75]:
try:
    patients_renamed.write.format("delta").mode("overwrite").save(f"{bronze_folder}\\healthcare_delta\\patients")
    print("table created")
except Exception as e:
    print("Table creation failed")
    print (e)


table created


In [69]:

procedure_renamed.write.format("delta").mode("overwrite").save(f"{bronze_folder}\\healthcare_delta\\procedure")

In [70]:
encounters_renamed.write.format("delta").mode("overwrite").save(f"{bronze_folder}\\healthcare_delta\\encounters")

In [71]:
payers_renamed.write.format("delta").mode("overwrite").save(f"{bronze_folder}\\healthcare_delta\\payers")

In [72]:
organizations_renamed.write.format("delta").mode("overwrite").save(f"{bronze_folder}\\healthcare_delta\\organizations")

In [73]:
description.write.format("delta").mode("overwrite").save(f"{bronze_folder}\\healthcare_delta\\description")

In [74]:
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x000002499A201750>>