In [None]:
from pyspark.sql import SparkSession
from delta.tables import *
import os
import pandas as pd
from pyspark.sql.functions import col, sum, count, when, coalesce, lit
from pyspark.sql.types import DoubleType
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

# Initialize Spark Session
# spark = SparkSession.builder.appName("Data Ingestion").getOrCreate()

spark.conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
spark.conf.set("spark.cassandra.connection.host", "aa93594f-b5ab-4912-994e-6156fb347536-us-east1.db.astra.datastax.com")
spark.conf.set("spark.cassandra.connection.port", "29042")  # CQL Port

# Configure SSL/TLS
spark.conf.set("spark.cassandra.connection.ssl.enabled", "true")
spark.conf.set("spark.cassandra.connection.ssl.trustStore.path", "./trustStore.jks")
spark.conf.set("spark.cassandra.connection.ssl.trustStore.password", "0ub5Xk2C3Pzqjhy1m")
spark.conf.set("spark.cassandra.connection.ssl.clientAuth.enabled", "true")
spark.conf.set("spark.cassandra.connection.ssl.keyStore.path", "./identity.jks")
spark.conf.set("spark.cassandra.connection.ssl.keyStore.password", "G1BrN3bZO47AQMI6X")

# spark.conf.set("spark.driver.extraJavaOptions", "-Djavax.net.debug=ssl")
# spark.conf.set("spark.executor.extraJavaOptions", "-Djavax.net.debug=ssl")


# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Data Ingestion") \
    .config("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .config("spark.cassandra.connection.host", "aa93594f-b5ab-4912-994e-6156fb347536-us-east1.db.astra.datastax.com") \
    .config("spark.cassandra.connection.port", "29042") \
    .config("spark.cassandra.connection.ssl.enabled", "true") \
    .config("spark.cassandra.connection.ssl.trustStore.path", "./trustStore.jks") \
    .config("spark.cassandra.connection.ssl.trustStore.password", "0ub5Xk2C3Pzqjhy1m") \
    .config("spark.cassandra.connection.ssl.clientAuth.enabled", "true") \
    .config("spark.cassandra.connection.ssl.keyStore.path", "./identity.jks") \
    .config("spark.cassandra.connection.ssl.keyStore.password", "G1BrN3bZO47AQMI6X") \
    .getOrCreate()



# Path to the service account JSON key in DBFS
service_account_path = "/dbfs/FileStore/shared_uploads/auth/noob2_bootcamp_407704_058a42626b1b.json"


# Configure Spark to use the service account JSON key for GCS authentication
spark.conf.set("fs.gs.auth.service.account.json.keyfile", service_account_path)


# GCS bucket details
bucket_name = "healthcare_analysis"
data_directory = f"gs://{bucket_name}/input/"
archive_directory = f"gs://{bucket_name}/archive/"

In [None]:
# Read all CSV files from the specified GCS directory
df = spark.read.csv(data_directory, inferSchema=True, header=True)

df.show()

+----------+---+------+--------------+---------------------+--------------+
|patient_id|age|gender|diagnosis_code|diagnosis_description|diagnosis_date|
+----------+---+------+--------------+---------------------+--------------+
|        P1| 45|     M|          H234|  High Blood Pressure|    2023-08-01|
|        P2| 32|     F|          D123|             Diabetes|    2023-08-01|
|        P3| 39|     F|          H234|  High Blood Pressure|    2023-08-01|
|        P4| 40|     F|          C345|               Cancer|    2023-08-01|
|        P5| 52|     M|          H234|  High Blood Pressure|    2023-08-01|
|        P6| 43|     F|          C345|               Cancer|    2023-08-01|
|        P7| 51|     M|          D123|             Diabetes|    2023-08-01|
|        P8| 67|     F|          H234|  High Blood Pressure|    2023-08-01|
|        P9| 32|     F|          D123|             Diabetes|    2023-08-01|
|       P10| 63|     M|          H234|  High Blood Pressure|    2023-08-01|
|       P11|

In [None]:
# Check for null values in each column
null_counts = df.agg(
    *[sum(col(column).isNull().cast("int")).alias(f"{column}_null_count") for column in df.columns]
)

# Check for data types
data_type_checks = [col(column).cast("string").alias(f"{column}_type_check") for column in df.columns]

# Apply the data type checks
df_check = df.select(data_type_checks)

# Show the results of the checks
print("Null Counts:")
null_counts.show()

print("Data Type Checks:")
df_check.show()

Null Counts:
+---------------------+--------------+-----------------+-------------------------+--------------------------------+-------------------------+
|patient_id_null_count|age_null_count|gender_null_count|diagnosis_code_null_count|diagnosis_description_null_count|diagnosis_date_null_count|
+---------------------+--------------+-----------------+-------------------------+--------------------------------+-------------------------+
|                    0|             0|                0|                        0|                               0|                        0|
+---------------------+--------------+-----------------+-------------------------+--------------------------------+-------------------------+

Data Type Checks:
+---------------------+--------------+-----------------+-------------------------+--------------------------------+-------------------------+
|patient_id_type_check|age_type_check|gender_type_check|diagnosis_code_type_check|diagnosis_description_type_check|d

In [None]:
# Group by diagnosis_code and gender, and calculate the count for each group
gender_counts = df.groupBy("diagnosis_code","diagnosis_description","gender").agg(count("patient_id").alias("counter"))

# Pivot the data to get Male and Female counts as separate columns
gender_pivoted = gender_counts.groupBy("diagnosis_code","diagnosis_description").pivot("gender").agg(
    coalesce(sum(when(col("gender") == "M", col("counter"))), lit(0)).alias("Males"),
    coalesce(sum(when(col("gender") == "F", col("counter"))), lit(0)).alias("Females"))

# Drop F_Males and M_females columns from gender_pivoted DataFrame
gender_pivoted = gender_pivoted.drop("F_Males","M_Females")

# Calculate the gender ratio
gender_ratio = gender_pivoted.withColumn("Gender_Ratio", col("M_Males") / col("F_Females"))

# Show updated DataFrame
gender_ratio.show()

+--------------+---------------------+---------+-------+------------------+
|diagnosis_code|diagnosis_description|F_Females|M_Males|      Gender_Ratio|
+--------------+---------------------+---------+-------+------------------+
|          D123|             Diabetes|       16|     18|             1.125|
|          C345|               Cancer|       22|     11|               0.5|
|          H234|  High Blood Pressure|       17|     16|0.9411764705882353|
+--------------+---------------------+---------+-------+------------------+



In [None]:
# Connecting to CassandraDB using Datastax
# This secure connect bundle is autogenerated when you download your SCB, 
# if yours is different update the file name below
cloud_config= {
  'secure_connect_bundle': '/dbfs/FileStore/shared_uploads/secure_connect_healthcare_db.zip'
}

# This token JSON file is autogenerated when you download your token, 
# if yours is different update the file name below
with open("/dbfs/FileStore/shared_uploads/healthcare_db_token__1_.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

row = session.execute("select release_version from system.local").one()
if row:
  print("Cassandra Connection Sucessful")
else:
  print("An error occurred.")

keyspace="healthcare"
table='stage_disease_ratio'

Cassandra Connection Sucessful


In [None]:
# Check if the table exists
existing_table_query = f"SELECT table_name FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table}'"
existing_table_result = session.execute(existing_table_query)

if existing_table_result.one():
    # Table exists, truncate (delete all data)
    truncate_query = f"TRUNCATE TABLE {keyspace}.{table}"
    session.execute(truncate_query)
else:
    # Table does not exist, create it
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS healthcare.stage_disease_ratio (
        diagnosis_code TEXT PRIMARY KEY,
        diagnosis_description TEXT,
        F_Females INT,
        M_Males INT,
        Gender_Ratio DOUBLE
    )
    """
    session.execute(create_table_query)

# Convert Spark DataFrame to Pandas DataFrame
pandas_df = gender_ratio.toPandas()

# Insert data into Cassandra table
for index, row in pandas_df.iterrows():
    insert_query = f"""
    INSERT INTO healthcare.stage_disease_ratio
    (diagnosis_code, diagnosis_description, F_Females, M_Males, Gender_Ratio)
    VALUES ('{row['diagnosis_code']}', '{row['diagnosis_description']}', 
            {row['F_Females']}, {row['M_Males']}, {row['Gender_Ratio']})
    """
    session.execute(insert_query)


In [None]:
# CHECKER

query = "SELECT * FROM healthcare.stage_disease_ratio"
result = session.execute(query)

# Print the results
for row in result:
    print(row)

Row(diagnosis_code='C345', diagnosis_description='Cancer', f_females=22, gender_ratio=0.5, m_males=11)
Row(diagnosis_code='D123', diagnosis_description='Diabetes', f_females=16, gender_ratio=1.125, m_males=18)
Row(diagnosis_code='H234', diagnosis_description='High Blood Pressure', f_females=17, gender_ratio=0.9411764705882353, m_males=16)


In [None]:
df.createOrReplaceTempView("patient_data")

# Use SQL to find the top 3 common diseases
top3_query = """
    SELECT
        ROW_NUMBER() OVER (ORDER BY count(*) DESC) AS Rank,
        diagnosis_code,
        diagnosis_description
    FROM
        patient_data
    GROUP BY
        diagnosis_code, diagnosis_description
    ORDER BY
        Rank
    LIMIT 3
"""

top3 = spark.sql(top3_query)

# Show the resulting DataFrame
top3.show()

+----+--------------+---------------------+
|Rank|diagnosis_code|diagnosis_description|
+----+--------------+---------------------+
|   1|          D123|             Diabetes|
|   2|          C345|               Cancer|
|   3|          H234|  High Blood Pressure|
+----+--------------+---------------------+



In [None]:
keyspace="healthcare"
table='stage_top3'


# Check if the table exists
existing_table_query = f"SELECT table_name FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table}'"
existing_table_result = session.execute(existing_table_query)

if existing_table_result.one():
    # Table exists, truncate (delete all data)
    truncate_query = f"TRUNCATE TABLE {keyspace}.{table}"
    session.execute(truncate_query)
else:
    # Table does not exist, create it
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS healthcare.stage_top3 (
        Rank INT PRIMARY KEY,
        Diagnosis_code TEXT,
        Diagnosis_description TEXT
    )
    """
    session.execute(create_table_query)

# Convert Spark DataFrame to Pandas DataFrame
pandas_df = top3.toPandas()

# Insert data into Cassandra table
for index, row in pandas_df.iterrows():
    insert_query = f"""
    INSERT INTO healthcare.stage_top3
    (Rank,Diagnosis_code, Diagnosis_description)
    VALUES ('{row['Rank']}', '{row['diagnosis_code']}', '{row['diagnosis_description']}')
    """
    session.execute(insert_query)

In [None]:
# CHECKER

query = "SELECT * FROM healthcare.stage_top3"
result = session.execute(query)

# Print the results
for row in result:
    print(row)

Row(rank='3', diagnosis_code='H234', diagnosis_description='High Blood Pressure')
Row(rank='2', diagnosis_code='C345', diagnosis_description='Cancer')
Row(rank='1', diagnosis_code='D123', diagnosis_description='Diabetes')


In [None]:


# Use SQL to create age buckets
distro_query = """
    SELECT
        diagnosis_code,
        diagnosis_description,
        CASE
            WHEN age >= 30 AND age < 40 THEN '30-40'
            WHEN age >= 40 AND age < 50 THEN '41-50'
            WHEN age >= 50 AND age < 60 THEN '51-60'
            WHEN age >= 60 THEN '61+'
        END AS age_category,
        COUNT(patient_id) AS patient_count,
        CONCAT(diagnosis_code, '_', 
            CASE
               WHEN age >= 30 AND age < 40 THEN '30-40'
               WHEN age >= 40 AND age < 50 THEN '41-50'
               WHEN age >= 50 AND age < 60 THEN '51-60'
               WHEN age >= 60 THEN '61+'
            END) AS code_age_category
    FROM
        patient_data
    GROUP BY
        diagnosis_code, diagnosis_description, age_category
"""

distro = spark.sql(distro_query)

# Show the resulting DataFrame
distro.show()

+--------------+---------------------+------------+-------------+-----------------+
|diagnosis_code|diagnosis_description|age_category|patient_count|code_age_category|
+--------------+---------------------+------------+-------------+-----------------+
|          H234|  High Blood Pressure|       51-60|            9|       H234_51-60|
|          D123|             Diabetes|         61+|            9|         D123_61+|
|          H234|  High Blood Pressure|         61+|            9|         H234_61+|
|          C345|               Cancer|       30-40|            6|       C345_30-40|
|          D123|             Diabetes|       30-40|            8|       D123_30-40|
|          D123|             Diabetes|       51-60|           12|       D123_51-60|
|          C345|               Cancer|         61+|            8|         C345_61+|
|          D123|             Diabetes|       41-50|            5|       D123_41-50|
|          C345|               Cancer|       41-50|           14|       C345

In [None]:
keyspace="healthcare"
table='stage_age_distro'


# Check if the table exists
existing_table_query = f"SELECT table_name FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table}'"
existing_table_result = session.execute(existing_table_query)

if existing_table_result.one():
    # Table exists, truncate (delete all data)
    truncate_query = f"TRUNCATE TABLE {keyspace}.{table}"
    session.execute(truncate_query)
else:
    # Table does not exist, create it
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS healthcare.stage_age_distro (
        Diagnosis_code TEXT,
        Diagnosis_description TEXT,
        age_category TEXT,
        patient_count INT,
        code_age_category TEXT PRIMARY KEY
    )
    """
    session.execute(create_table_query)

# Convert Spark DataFrame to Pandas DataFrame
pandas_df = distro.toPandas()

# Insert data into Cassandra table
for index, row in pandas_df.iterrows():
    insert_query = f"""
    INSERT INTO healthcare.stage_age_distro
    (Diagnosis_code, Diagnosis_description,age_category,patient_count,code_age_category)
    VALUES ('{row['diagnosis_code']}', '{row['diagnosis_description']}','{row['age_category']}',{row['patient_count']},'{row['code_age_category']}')
    """
    session.execute(insert_query)

In [None]:
# CHECKER

query = "SELECT * FROM healthcare.stage_age_distro"
result = session.execute(query)

# Print the results
for row in result:
    print(row)

Row(code_age_category='D123_51-60', age_category='51-60', diagnosis_code='D123', diagnosis_description='Diabetes', patient_count=12)
Row(code_age_category='H234_41-50', age_category='41-50', diagnosis_code='H234', diagnosis_description='High Blood Pressure', patient_count=8)
Row(code_age_category='C345_61+', age_category='61+', diagnosis_code='C345', diagnosis_description='Cancer', patient_count=8)
Row(code_age_category='H234_61+', age_category='61+', diagnosis_code='H234', diagnosis_description='High Blood Pressure', patient_count=9)
Row(code_age_category='D123_41-50', age_category='41-50', diagnosis_code='D123', diagnosis_description='Diabetes', patient_count=5)
Row(code_age_category='C345_51-60', age_category='51-60', diagnosis_code='C345', diagnosis_description='Cancer', patient_count=5)
Row(code_age_category='H234_51-60', age_category='51-60', diagnosis_code='H234', diagnosis_description='High Blood Pressure', patient_count=9)
Row(code_age_category='H234_30-40', age_category='30-4

In [None]:
# Use SQL to flag senior citizens
senior_query = """
    SELECT
        patient_id,age,Case when age>=60 THEN 'Y'
        ELSE 'N' END as senior_citizen_flag
    FROM
        patient_data
"""

senior = spark.sql(senior_query)

# Show the resulting DataFrame
senior.show()

+----------+---+-------------------+
|patient_id|age|senior_citizen_flag|
+----------+---+-------------------+
|        P1| 45|                  N|
|        P2| 32|                  N|
|        P3| 39|                  N|
|        P4| 40|                  N|
|        P5| 52|                  N|
|        P6| 43|                  N|
|        P7| 51|                  N|
|        P8| 67|                  Y|
|        P9| 32|                  N|
|       P10| 63|                  Y|
|       P11| 61|                  Y|
|       P12| 67|                  Y|
|       P13| 42|                  N|
|       P14| 65|                  Y|
|       P15| 61|                  Y|
|       P16| 38|                  N|
|       P17| 69|                  Y|
|       P18| 62|                  Y|
|       P19| 38|                  N|
|       P20| 55|                  N|
+----------+---+-------------------+
only showing top 20 rows



In [None]:
keyspace="healthcare"
table='stage_senior_citizen'


# Check if the table exists
existing_table_query = f"SELECT table_name FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table}'"
existing_table_result = session.execute(existing_table_query)

if existing_table_result.one():
    # Table exists, truncate (delete all data)
    truncate_query = f"TRUNCATE TABLE {keyspace}.{table}"
    session.execute(truncate_query)
else:
    # Table does not exist, create it
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS healthcare.stage_senior_citizen (
        patient_id TEXT PRIMARY KEY,
        age INT,
        senior_citizen_flag TEXT,
    )
    """
    session.execute(create_table_query)

# Convert Spark DataFrame to Pandas DataFrame
pandas_df = senior.toPandas()

# Insert data into Cassandra table
for index, row in pandas_df.iterrows():
    insert_query = f"""
    INSERT INTO healthcare.stage_senior_citizen
    (patient_id,age,senior_citizen_flag)
    VALUES ('{row['patient_id']}',{row['age']},'{row['senior_citizen_flag']}')
    """
    session.execute(insert_query)

In [None]:
# CHECKER

query = "SELECT * FROM healthcare.stage_senior_citizen"
result = session.execute(query)

# Print the results
for row in result:
    print(row)

Row(patient_id='P78', age=34, senior_citizen_flag='N')
Row(patient_id='P24', age=53, senior_citizen_flag='N')
Row(patient_id='P54', age=44, senior_citizen_flag='N')
Row(patient_id='P97', age=56, senior_citizen_flag='N')
Row(patient_id='P39', age=31, senior_citizen_flag='N')
Row(patient_id='P41', age=50, senior_citizen_flag='N')
Row(patient_id='P61', age=35, senior_citizen_flag='N')
Row(patient_id='P46', age=47, senior_citizen_flag='N')
Row(patient_id='P11', age=61, senior_citizen_flag='Y')
Row(patient_id='P3', age=39, senior_citizen_flag='N')
Row(patient_id='P30', age=43, senior_citizen_flag='N')
Row(patient_id='P47', age=56, senior_citizen_flag='N')
Row(patient_id='P52', age=36, senior_citizen_flag='N')
Row(patient_id='P10', age=63, senior_citizen_flag='Y')
Row(patient_id='P58', age=63, senior_citizen_flag='Y')
Row(patient_id='P86', age=31, senior_citizen_flag='N')
Row(patient_id='P90', age=59, senior_citizen_flag='N')
Row(patient_id='P12', age=67, senior_citizen_flag='Y')
Row(patient

In [None]:
# List and move input files to the archieve folder individually
file_list = dbutils.fs.ls(data_directory)
for file in file_list:
    if file.name.endswith(".csv"):
        print(f"{file} Moved in archive folder")
        dbutils.fs.mv(file.path, os.path.join(archive_directory, file.name))

FileInfo(path='gs://healthcare_analysis/input/health_data_20230801.csv', name='health_data_20230801.csv', size=3763, modificationTime=1703838448883) Moved in archive folder
