In [1]:
import pyspark
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,TimestampType,BooleanType

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

import IPython
IPython.auto_scroll_threshold = 9999

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
from IPython.display import display

import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

# Patients

In [2]:
patients_raw = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092")\
.option("subscribe","patient")\
.option("startingOffsets", "earliest")\
.option("endingOffsets", "latest")\
.load() 

In [3]:
@udf('boolean')
def is_Defined_PatientRace(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['PatientRace'] != 'Unknown':
        return True
    return False

In [4]:
@udf('boolean')
def is_Defined_PatientLanguage(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['PatientLanguage'] != 'Unknown':
        return True
    return False

In [5]:
def patient_schema():
     return StructType(
                        [StructField("PatientID",StringType(),True),
                         StructField("PatientGender",StringType(),True),
                         StructField("PatientDateOfBirth",StringType(),True),
                         StructField("PatientRace",StringType(),True),
                         StructField("PatientMaritalStatus",StringType(),True),
                         StructField("PatientLanguage",StringType(),True),                                                                                                  
                         StructField("PatientPopulationPercentageBelowPoverty",StringType(),True)
                        ]
                    )

In [6]:
patients_raw.show(truncate = True)

+----+--------------------+-------+---------+------+--------------------+-------------+
| key|               value|  topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-------+---------+------+--------------------+-------------+
|null|[7B 22 50 61 74 6...|patient|        0|     0|2020-12-04 22:57:...|            0|
|null|[7B 22 50 61 74 6...|patient|        0|     1|2020-12-04 22:57:...|            0|
|null|[7B 22 50 61 74 6...|patient|        0|     2|2020-12-04 22:58:...|            0|
+----+--------------------+-------+---------+------+--------------------+-------------+



In [7]:
patients_clean = patients_raw\
                .filter(is_Defined_PatientRace(patients_raw.value.cast('string'))) \
                .filter(is_Defined_PatientLanguage(patients_raw.value.cast('string'))) \
                .select(patients_raw.value.cast('string').alias('raw_event'),\
                patients_raw.timestamp.cast('string'),\
                from_json(patients_raw.value.cast('string'),\
                patient_schema()).alias('json')).select('raw_event', 'timestamp', 'json.*')

In [8]:
patients_clean.printSchema()

root
 |-- raw_event: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- PatientID: string (nullable = true)
 |-- PatientGender: string (nullable = true)
 |-- PatientDateOfBirth: string (nullable = true)
 |-- PatientRace: string (nullable = true)
 |-- PatientMaritalStatus: string (nullable = true)
 |-- PatientLanguage: string (nullable = true)
 |-- PatientPopulationPercentageBelowPoverty: string (nullable = true)



In [9]:
patients_clean.show(5)

+--------------------+--------------------+--------------------+-------------+--------------------+----------------+--------------------+---------------+---------------------------------------+
|           raw_event|           timestamp|           PatientID|PatientGender|  PatientDateOfBirth|     PatientRace|PatientMaritalStatus|PatientLanguage|PatientPopulationPercentageBelowPoverty|
+--------------------+--------------------+--------------------+-------------+--------------------+----------------+--------------------+---------------+---------------------------------------+
|{"PatientID":"302...|2020-12-04 22:57:...|302130A8-171B-414...|       Female|1984-12-06 14:09:...|           White|             Married|        English|                                    3.2|
|{"PatientID":"EDA...|2020-12-04 22:57:...|EDA9123E-E39F-420...|       Female|1930-01-13 09:32:...|           White|             Married|        English|                                  10.25|
|{"PatientID":"EDF...|2020-12-

# Admissions

In [10]:
admissions_raw = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092")\
.option("subscribe","admissions")\
.option("startingOffsets", "earliest")\
.option("endingOffsets", "latest")\
.load() 

In [11]:
def admissions_schema():
     return StructType(
                        [StructField("PatientID",StringType(),True),
                         StructField("AdmissionID",StringType(),True),
                         StructField("AdmissionStartDate",StringType(),True),
                         StructField("AdmissionEndDate",StringType(),True),
                         StructField("PrimaryDiagnosisCode",StringType(),True),
                         StructField("PrimaryDiagnosisDescription",StringType(),True),
                        ]
                    )

In [12]:
admissions_raw.show(truncate = True)

+----+--------------------+----------+---------+------+--------------------+-------------+
| key|               value|     topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------+---------+------+--------------------+-------------+
|null|[7B 22 49 44 22 3...|admissions|        0|     0|2020-12-04 22:57:...|            0|
|null|[7B 22 49 44 22 3...|admissions|        0|     1|2020-12-04 22:57:...|            0|
|null|[7B 22 49 44 22 3...|admissions|        0|     2|2020-12-04 22:57:...|            0|
+----+--------------------+----------+---------+------+--------------------+-------------+



In [13]:
admissions_clean = admissions_raw\
                .select(admissions_raw.value.cast('string').alias('raw_event'),\
                admissions_raw.timestamp.cast('string'),\
                from_json(admissions_raw.value.cast('string'),\
                admissions_schema()).alias('json')).select('raw_event', 'timestamp', 'json.*')

In [14]:
admissions_clean.printSchema()

root
 |-- raw_event: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- PatientID: string (nullable = true)
 |-- AdmissionID: string (nullable = true)
 |-- AdmissionStartDate: string (nullable = true)
 |-- AdmissionEndDate: string (nullable = true)
 |-- PrimaryDiagnosisCode: string (nullable = true)
 |-- PrimaryDiagnosisDescription: string (nullable = true)



In [15]:
admissions_clean.show(5)

+--------------------+--------------------+--------------------+-----------+------------------+----------------+--------------------+---------------------------+
|           raw_event|           timestamp|           PatientID|AdmissionID|AdmissionStartDate|AdmissionEndDate|PrimaryDiagnosisCode|PrimaryDiagnosisDescription|
+--------------------+--------------------+--------------------+-----------+------------------+----------------+--------------------+---------------------------+
|{"ID":17909,"Pati...|2020-12-04 22:57:...|040F3153-F8D4-44B...|          2|         4/25/2001|        5/2/2001|             E11.621|       Type 2 diabetes m...|
|{"ID":1029,"Patie...|2020-12-04 22:57:...|BB0C8FFA-8028-451...|          2|         3/13/2002|       3/17/2002|                 H62|       Disorders of exte...|
|{"ID":16100,"Pati...|2020-12-04 22:57:...|AF712E2F-19F0-459...|          2|         3/23/2012|        4/3/2012|              M11.01|       Hydroxyapatite de...|
+--------------------+------

# lab_results

In [16]:
lab_results_raw = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092")\
.option("subscribe","lab_results")\
.option("startingOffsets", "earliest")\
.option("endingOffsets", "latest")\
.load() 

In [17]:
def lab_results_schema():
     return StructType(
                        [StructField("PatientID",StringType(),True),
                         StructField("AdmissionID",StringType(),True),
                         StructField("LabName",StringType(),True),
                         StructField("LabValue",StringType(),True),
                         StructField("LabUnits",StringType(),True),
                         StructField("LabDateTime",StringType(),True),
                        ]
                    )

In [18]:
lab_results_raw.show(truncate = True)

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|[7B 22 55 6E 6E 6...|lab_results|        0|     0|2020-12-04 22:57:...|            0|
+----+--------------------+-----------+---------+------+--------------------+-------------+



In [19]:
lab_results_clean = lab_results_raw\
                .select(lab_results_raw.value.cast('string').alias('raw_event'),\
                lab_results_raw.timestamp.cast('string'),\
                from_json(lab_results_raw.value.cast('string'),\
                lab_results_schema()).alias('json')).select('raw_event', 'timestamp', 'json.*')

In [20]:
lab_results_clean.printSchema()

root
 |-- raw_event: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- PatientID: string (nullable = true)
 |-- AdmissionID: string (nullable = true)
 |-- LabName: string (nullable = true)
 |-- LabValue: string (nullable = true)
 |-- LabUnits: string (nullable = true)
 |-- LabDateTime: string (nullable = true)



In [21]:
lab_results_clean.show(10)

+--------------------+--------------------+--------------------+-----------+--------+--------+--------+--------------------+
|           raw_event|           timestamp|           PatientID|AdmissionID| LabName|LabValue|LabUnits|         LabDateTime|
+--------------------+--------------------+--------------------+-----------+--------+--------+--------+--------------------+
|{"Unnamed: 0":362...|2020-12-04 22:57:...|1BE02262-CCBF-4BB...|          5|CBC: MCH|    28.0|      pg|2009-08-14 08:16:...|
+--------------------+--------------------+--------------------+-----------+--------+--------+--------+--------------------+

