<a href="https://colab.research.google.com/github/lisethrubio/Project4-Machine_Learning_Model/blob/Steven_branch2/bodyPerformance_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import os
# Find the latest version of spark 3.x  from https://downloads.apache.org/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.5.2'
spark_version = 'spark-3.5.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.83)] [Connected to cloud.r-                                                                                                    Hit:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.83)] [Waiting for headers]                                                                                                     Hit:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.83)] [Connected to r2u.stat                                                                                                    Hit:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:6 http://s

In [3]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder\
    .appName("SparkSQL")\
    .config("spark.sql.debug.maxToStringFields", 2000)\
    .config("spark.driver.memory", "2g")\
    .getOrCreate()

In [4]:
# Read in data from bodyPerformance.csv
from pyspark import SparkFiles
csv = "/content/bodyPerformance.csv"
spark.sparkContext.addFile(csv)
df = spark.read.csv(SparkFiles.get("bodyPerformance.csv"), sep=",", header=True)
df.show()

+----+------+---------+---------+----------+---------+--------+---------+-----------------------+--------------+-------------+-----+
| age|gender|height_cm|weight_kg|body fat_%|diastolic|systolic|gripForce|sit and bend forward_cm|sit-ups counts|broad jump_cm|class|
+----+------+---------+---------+----------+---------+--------+---------+-----------------------+--------------+-------------+-----+
|27.0|     M|    172.3|    75.24|      21.3|     80.0|   130.0|     54.9|                   18.4|          60.0|        217.0|    C|
|25.0|     M|    165.0|     55.8|      15.7|     77.0|   126.0|     36.4|                   16.3|          53.0|        229.0|    A|
|31.0|     M|    179.6|     78.0|      20.1|     92.0|   152.0|     44.8|                   12.0|          49.0|        181.0|    C|
|32.0|     M|    174.5|     71.1|      18.4|     76.0|   147.0|     41.4|                   15.2|          53.0|        219.0|    B|
|28.0|     M|    173.8|     67.7|      17.1|     70.0|   127.0|     4

In [5]:
# Rename the column to remove spaces and special characters
df = df.withColumnRenamed("body fat_%", "body_fat_percent") \
       .withColumnRenamed("sit and bend forward_cm", "sit_and_bend_forward_cm") \
       .withColumnRenamed("sit-ups counts", "sit_ups_counts") \
       .withColumnRenamed("broad jump_cm", "broad_jump_cm")
df.show()

+----+------+---------+---------+----------------+---------+--------+---------+-----------------------+--------------+-------------+-----+
| age|gender|height_cm|weight_kg|body_fat_percent|diastolic|systolic|gripForce|sit_and_bend_forward_cm|sit_ups_counts|broad_jump_cm|class|
+----+------+---------+---------+----------------+---------+--------+---------+-----------------------+--------------+-------------+-----+
|27.0|     M|    172.3|    75.24|            21.3|     80.0|   130.0|     54.9|                   18.4|          60.0|        217.0|    C|
|25.0|     M|    165.0|     55.8|            15.7|     77.0|   126.0|     36.4|                   16.3|          53.0|        229.0|    A|
|31.0|     M|    179.6|     78.0|            20.1|     92.0|   152.0|     44.8|                   12.0|          49.0|        181.0|    C|
|32.0|     M|    174.5|     71.1|            18.4|     76.0|   147.0|     41.4|                   15.2|          53.0|        219.0|    B|
|28.0|     M|    173.8|    

In [8]:
# Create a temporary view
df.createOrReplaceTempView('health')

In [9]:
# Run a sql query that groups the data on class, showing body fat and blood pressure
start_time = time.time()

spark.sql("""select class, avg(body_fat_percent), avg(diastolic), avg(systolic) from health group by 1""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-----+---------------------+-----------------+------------------+
|class|avg(body_fat_percent)|   avg(diastolic)|     avg(systolic)|
+-----+---------------------+-----------------+------------------+
|    B|   22.039980785778287|78.65646847923514|130.63934867045117|
|    D|   27.735432430576317|80.07653030755449|131.08450283666767|
|    C|   22.644563855479248|  78.550671842341| 129.9253508510003|
|    A|    20.53915947879328|77.90334528076464| 129.2900238948626|
+-----+---------------------+-----------------+------------------+

--- 3.190762996673584 seconds ---


In [10]:
# Write out the data in parquet format
df.write.parquet('parquet_health', mode='overwrite')

In [11]:
# Read in our new parquet formatted data
p_df=spark.read.parquet('parquet_health')

In [12]:
# A parquet formatted DataFrame has all the same methods as a row-based dataframe
# We can convert the dataframe to a view.
p_df.createOrReplaceTempView('p_health')

In [13]:
start_time = time.time()

spark.sql("""select class, avg(body_fat_percent), avg(diastolic), avg(systolic) from p_health group by 1""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-----+---------------------+-----------------+------------------+
|class|avg(body_fat_percent)|   avg(diastolic)|     avg(systolic)|
+-----+---------------------+-----------------+------------------+
|    B|   22.039980785778287|78.65646847923514|130.63934867045117|
|    D|   27.735432430576317|80.07653030755449|131.08450283666767|
|    C|   22.644563855479248|  78.550671842341| 129.9253508510003|
|    A|    20.53915947879328|77.90334528076464| 129.2900238948626|
+-----+---------------------+-----------------+------------------+

--- 1.9916579723358154 seconds ---


In [14]:
# Run a sql query that groups the data on class, showing exercise scores
start_time = time.time()

spark.sql("""select class, avg(gripForce), avg(sit_and_bend_forward_cm), avg(sit_ups_counts), avg(broad_jump_cm) from health group by 1""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-----+-----------------+----------------------------+-------------------+------------------+
|class|   avg(gripForce)|avg(sit_and_bend_forward_cm)|avg(sit_ups_counts)|avg(broad_jump_cm)|
+-----+-----------------+----------------------------+-------------------+------------------+
|    B|37.91043621153271|           17.46758589781895|  42.63919928293994|195.33256647744247|
|    D|34.74774559570019|           7.590259779038491| 29.881337712750078|173.82084204240073|
|    C|36.58266348163633|            14.3906449686474|  38.72021498954912| 188.6279486413855|
|    A| 38.6157228195937|           21.39177718040618|  47.84826762246117|202.74402628434888|
+-----+-----------------+----------------------------+-------------------+------------------+

--- 1.2842769622802734 seconds ---


In [15]:
start_time = time.time()

spark.sql("""select class, avg(gripForce), avg(sit_and_bend_forward_cm), avg(sit_ups_counts), avg(broad_jump_cm) from p_health group by 1""").show()

print("--- %s seconds ---" % (time.time() - start_time))

+-----+-----------------+----------------------------+-------------------+------------------+
|class|   avg(gripForce)|avg(sit_and_bend_forward_cm)|avg(sit_ups_counts)|avg(broad_jump_cm)|
+-----+-----------------+----------------------------+-------------------+------------------+
|    B|37.91043621153271|           17.46758589781895|  42.63919928293994|195.33256647744247|
|    D|34.74774559570019|           7.590259779038491| 29.881337712750078|173.82084204240073|
|    C|36.58266348163633|            14.3906449686474|  38.72021498954912| 188.6279486413855|
|    A| 38.6157228195937|           21.39177718040618|  47.84826762246117|202.74402628434888|
+-----+-----------------+----------------------------+-------------------+------------------+

--- 1.8317821025848389 seconds ---
