In [1]:
import os
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.2'
spark_version = 'spark-3.4.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Fetched 229 kB in 2s (142 kB/s)
Reading package lists... Done


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.master("local[*]").getOrCreate()
sqlContext = SQLContext(spark)



In [3]:
import pandas as pd

heartdata2020 = 'https://raw.githubusercontent.com/oliverkisza/Final-Project-Team-1/main/Resources/2020_cleaned.csv'

pd_df = pd.read_csv(heartdata2020)
heart2020_df = spark.createDataFrame(pd_df)

heart2020_df.limit(5).show()

+------------+-----+-------+---------------+------+------------------+----------------+-----------------+------+-----------+-----+--------+------------------+-------------+------------+------+-------------+----------+
|HeartDisease|  BMI|Smoking|AlcoholDrinking|Stroke|PhysicalHealthDays|MentalHealthDays|DifficultyWalking|   Sex|AgeCategory| Race|Diabetic|PhysicalActivities|GeneralHealth|HoursOfSleep|Asthma|KidneyDisease|SkinCancer|
+------------+-----+-------+---------------+------+------------------+----------------+-----------------+------+-----------+-----+--------+------------------+-------------+------------+------+-------------+----------+
|          No| 16.6|    Yes|             No|    No|               3.0|            30.0|               No|Female|      55-59|White|     Yes|               Yes|    Very good|         5.0|   Yes|           No|       Yes|
|          No|20.34|     No|             No|   Yes|               0.0|             0.0|               No|Female|80 or older|Whit

In [4]:
heartdata2022 = 'https://raw.githubusercontent.com/oliverkisza/Final-Project-Team-1/main/Resources/cleaned_df.csv'

pd_df = pd.read_csv(heartdata2022)
heart2022_df = spark.createDataFrame(pd_df)

heart2022_df.limit(5).show()

+-------------+---------------+------------+------------+---------------+-----------+----------+--------------+-----------------+-----+----------+--------+---------------------+----------------------+-----------------+------------------+------------+-------------+------------+-------------+------------+-------------+----------------+-----------------+----------+-----------+------------------------+-------------------------+-------------------+--------------------+---------------+----------------+----------------------+-----------------------+--------------------------+---------------------------+--------------------------+---------------------------+--------------------+---------------------+----------------------------+-----------------------------+--------------------+---------------------+------------+-------------+----------------------------------------------+------------------------------+-----------------------------------------------+--------------------------------------------

2020 Queries

In [5]:
heart2020_df.createOrReplaceTempView('heart20')

In [6]:
spark.sql('cache table heart20')

DataFrame[]

In [7]:
spark.catalog.isCached('heart20')

True

In [8]:
a2020q1 = """
SELECT
  HeartDisease,
  COUNT(*) AS TOTAL,
  ROUND(COUNT(CASE WHEN Smoking = 'Yes' THEN Smoking END) / TOTAL * 100,2) AS PERCENT_SMOKING,
  ROUND(COUNT(CASE WHEN AlcoholDrinking = 'Yes' THEN AlcoholDrinking END) / TOTAL * 100,2) AS PERCENT_DRINKERS,
  ROUND(COUNT(CASE WHEN Stroke = 'Yes' THEN Stroke END) / TOTAL * 100,2) AS PERCENT_STROKE,
  ROUND(COUNT(CASE WHEN Diabetic = 'Yes' THEN Diabetic END) / TOTAL * 100,2) AS PERCENT_DIABETIC,
  ROUND(COUNT(CASE WHEN Asthma = 'Yes' THEN Asthma END) / TOTAL * 100,2) AS PERCENT_ASTHMA,
  ROUND(COUNT(CASE WHEN KidneyDisease = 'Yes' THEN KidneyDisease END) / TOTAL * 100,2) AS PERCENT_KIDNEY_DISEASE,
  ROUND(COUNT(CASE WHEN SkinCancer = 'Yes' THEN SkinCancer END) / TOTAL * 100,2) AS PERCENT_SKIN_CANCER
FROM heart20
Group by HeartDisease
ORDER BY HeartDisease DESC
"""
spark.sql(a2020q1).show()

+------------+------+---------------+----------------+--------------+----------------+--------------+----------------------+-------------------+
|HeartDisease| TOTAL|PERCENT_SMOKING|PERCENT_DRINKERS|PERCENT_STROKE|PERCENT_DIABETIC|PERCENT_ASTHMA|PERCENT_KIDNEY_DISEASE|PERCENT_SKIN_CANCER|
+------------+------+---------------+----------------+--------------+----------------+--------------+----------------------+-------------------+
|         Yes| 26584|          58.66|            4.16|         16.07|           33.69|         17.89|                 12.65|              18.18|
|          No|286430|          39.58|            7.09|           2.6|           11.12|          12.9|                  2.84|               8.47|
+------------+------+---------------+----------------+--------------+----------------+--------------+----------------------+-------------------+



In [17]:
a2020q2 = """
SELECT
  GeneralHealth,
  Count(GeneralHealth) AS Total,
  COUNT(CASE WHEN HeartDisease = 'Yes' THEN HeartDisease END) AS HighRiskCount,
  ROUND(HighRiskCount /  Total * 100,2) AS Percent_At_HighRisk
FROM heart20
GROUP BY GeneralHealth
ORDER BY Percent_At_HighRisk ASC
"""
spark.sql(a2020q2).show()

+-------------+------+-------------+-------------------+
|GeneralHealth| Total|HighRiskCount|Percent_At_HighRisk|
+-------------+------+-------------+-------------------+
|    Excellent| 66095|         1465|               2.22|
|    Very good|111792|         5239|               4.69|
|         Good| 90635|         9265|              10.22|
|         Fair| 33550|         6877|               20.5|
|         Poor| 10942|         3738|              34.16|
+-------------+------+-------------+-------------------+



In [10]:
spark.sql('uncache table heart20')

DataFrame[]

In [11]:
heart2020_df.toPandas().to_csv('2020_cleaned.csv')

2022

In [18]:
heart2022_df.createOrReplaceTempView('heart22')

In [19]:
spark.sql('cache table heart22')

DataFrame[]

In [28]:
a2022q1 = """
SELECT
  AgeCategory,
  COUNT(*) AS TOTAL,
  ROUND(COUNT(CASE WHEN HighRiskLastYear_Yes = '1' THEN HighRiskLastYear_Yes END) / TOTAL * 100,2) AS PERCENT_AT_HIGHRISK
FROM heart22
GROUP BY AgeCategory
ORDER BY AgeCategory DESC
"""
spark.sql(a2022q1).show()

+-----------+-----+-------------------+
|AgeCategory|TOTAL|PERCENT_AT_HIGHRISK|
+-----------+-----+-------------------+
|         12|12173|               0.58|
|         11|11986|               0.68|
|         10|16804|               1.03|
|          9|18216|               1.34|
|          8|17436|               1.83|
|          7|14544|                2.7|
|          6|12991|                3.9|
|          5|10785|                4.9|
|          4|10786|               6.58|
|          3| 9716|                8.5|
|          2| 8359|              11.32|
|          1| 6979|               13.9|
|          0| 8248|              15.74|
+-----------+-----+-------------------+



In [15]:
spark.sql('uncache table heart22')

DataFrame[]

In [16]:
heart2022_df.toPandas().to_csv('cleaned_df.csv')