# PySpark tutorial

## What is PySpark?


pyspark is a python api for working with apache spark.

* **Python api**: you can use the syntex and agility of python to interact with and send commands to a system that is not based, at its core, on python. 

* **Apache Spark**: a system designed for working, analyzing and modeling with immense amounts of data in many computers at the same time. Putting it in a different way, apache spark allows you to run computations in parallel, instead of sequentially. It allows you to divide one incredibly large task into many smaller tasks, and run each such task on a different machine. This allowes you to accomplish your analysis goals in reasonable time that would not be possible on a single machine.

usually, we would define the amount of data that suits PySpark as what would not fit into a single machine storage (let alone RAM).

**important related concepts:** 
1. distributed computing - when you distribute a task into several smaller task that run at the same time. this is what pyspark allows you to do with many machines, but it can also be done on a single machine with several threads, for example.
2. cluster - a network of machines that can take on tasks from a user, interact with one another and return results. these provide the computing resources that pyspark will use to make the computations.
3. Resilient Distributed Dataset (RDD) - an immutable distributed collection of data. it is not tabular and has no data schema. therefore, for tabular data wrangling, DataFrames allowes for more API options and uner-then-hood optimizations. still, you might encounter RDDs as you learn more about Spark, and should be aware of their existence.

**Part of PySpark we will cover:**
1. PySpark SQL - contains commands for data processing and manipulation.
2. PySpark MLlib - includes a variety of models, model training and related commands.

**Spark Architecture:**
to send commands and receive results from a cluster, you will need to initiate a spark session. this object is your tool for interacting with Spark. each user of the cluster will have its own Spark Session, that will allow him to use the cluster in isolation from other users. all of the sessions are communicating with a spark context, which is the master node in the cluster - that is, it assigns each of computers in the cluster tasks and coordinates them. each of the computers in the cluster that perform tasks for a master node is called a worker node. to connect to a worker node, the master node needs to get that node's comput power allocated to it, by a cluster manager, that is responsable for distributing the cluster resources. inside each worker node, there are execute programs that run the tasks - they can run multiple tasks simultaneously, and has their own cashe for storing results. so, each master node can have multiple worker nodes, that can have multiple tasks running.  

## Part01- install, import, and Session

In [1]:
!pip install pyspark # ~ 1 min

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=e23d38f273fb0f1d90160a75fe7e0d159a466ba1346d3a2a4fbe40b916c326b1
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
!gdown --id 1mnHABJPgkjhfwPpYWwn5LONVg31A0WWh

Downloading...
From: https://drive.google.com/uc?id=1mnHABJPgkjhfwPpYWwn5LONVg31A0WWh
To: /content/heart.csv
100% 35.9k/35.9k [00:00<00:00, 53.4MB/s]


In [3]:
!ls

heart.csv  sample_data


**Heart Failure Prediction Dataset**: https://www.kaggle.com/fedesoriano/heart-failure-prediction


In [4]:
import pyspark

In [5]:
pyspark.__version__

'3.4.0'

In [6]:
# a SparkSession object can perform the most common data processing tasks
from pyspark.sql import SparkSession

In [7]:
# will return existing session if one was created before and was not closed
spark = SparkSession.builder.appName('youtube_test').getOrCreate()

In [8]:
spark

## Part02- read and write (save)

In [9]:
# let PySpark infer the schema
df = spark.read.csv('heart.csv', header=True, nullValue='Nan', inferSchema=True)

In [10]:
#save a new dataframe
df.write.format('csv').save('heart2.csv')
#mode types
df.write.format('csv').mode('overwrite').save('heart2.csv')

In [11]:
type(df)

pyspark.sql.dataframe.DataFrame

In [12]:
pd_df = df.toPandas()

In [13]:
type(pd_df)

pandas.core.frame.DataFrame

In [14]:
df_sp = spark.createDataFrame(pd_df)

In [15]:
type(df_sp)

pyspark.sql.dataframe.DataFrame

## Part03- Show df

In [None]:
# show 20 rows of table
df.show()

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
| 39|  M|          NAP|      120|        339|        0| 

In [None]:
#df.take(3)
#df.limit(3)
#df.head(3)
#df.collect()

In [None]:
df.columns

['Age',
 'Sex',
 'ChestPainType',
 'RestingBP',
 'Cholesterol',
 'FastingBS',
 'RestingECG',
 'MaxHR',
 'ExerciseAngina',
 'Oldpeak',
 'ST_Slope',
 'HeartDisease']

In [None]:
df.describe().show()

+-------+------------------+----+-------------+------------------+------------------+-------------------+----------+------------------+--------------+------------------+--------+-------------------+
|summary|               Age| Sex|ChestPainType|         RestingBP|       Cholesterol|          FastingBS|RestingECG|             MaxHR|ExerciseAngina|           Oldpeak|ST_Slope|       HeartDisease|
+-------+------------------+----+-------------+------------------+------------------+-------------------+----------+------------------+--------------+------------------+--------+-------------------+
|  count|               918| 918|          918|               918|               918|                918|       918|               918|           918|               918|     918|                918|
|   mean|53.510893246187365|null|         null|132.39651416122004| 198.7995642701525|0.23311546840958605|      null|136.80936819172112|          null|0.8873638344226581|    null| 0.5533769063180828|
| std

In [None]:
len(df.columns)

12

In [None]:
df.count()

918

##Part04- Data Types | Create and Drop Columns

In [None]:
df.dtypes

[('Age', 'int'),
 ('Sex', 'string'),
 ('ChestPainType', 'string'),
 ('RestingBP', 'int'),
 ('Cholesterol', 'int'),
 ('FastingBS', 'int'),
 ('RestingECG', 'string'),
 ('MaxHR', 'int'),
 ('ExerciseAngina', 'string'),
 ('Oldpeak', 'double'),
 ('ST_Slope', 'string'),
 ('HeartDisease', 'int')]

In [None]:
### Check the schema
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- RestingBP: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- FastingBS: integer (nullable = true)
 |-- RestingECG: string (nullable = true)
 |-- MaxHR: integer (nullable = true)
 |-- ExerciseAngina: string (nullable = true)
 |-- Oldpeak: double (nullable = true)
 |-- ST_Slope: string (nullable = true)
 |-- HeartDisease: integer (nullable = true)



In [None]:
df.select('Age').show(3)

+---+
|Age|
+---+
| 40|
| 49|
| 37|
+---+
only showing top 3 rows



In [None]:
df.select(['Age', 'Sex']).show(3)

+---+---+
|Age|Sex|
+---+---+
| 40|  M|
| 49|  F|
| 37|  M|
+---+---+
only showing top 3 rows



In [None]:
### Adding Columns in data frame
df=df.withColumn('Age 2023', df['Age']+2)

In [None]:
df.show(3)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|Age 2023|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|      42|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|      51|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|      39|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------+
only showing top 3 rows



In [None]:
### Drop the columns
df = df.drop('Age 2023')

In [None]:
df.show(3)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
only showing top 3 rows



In [None]:
### Rename the columns
df.withColumnRenamed('HeartDisease', 'stroke').show(3)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|stroke|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|     0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|     1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|     0|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------+
only showing top 3 rows



In [None]:
from pyspark.sql.types import FloatType, IntegerType, StringType, BooleanType, DoubleType
df = df.withColumn('Age', df['Age'].cast(FloatType()))
df.printSchema()

root
 |-- Age: float (nullable = true)
 |-- Sex: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- RestingBP: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- FastingBS: integer (nullable = true)
 |-- RestingECG: string (nullable = true)
 |-- MaxHR: integer (nullable = true)
 |-- ExerciseAngina: string (nullable = true)
 |-- Oldpeak: double (nullable = true)
 |-- ST_Slope: string (nullable = true)
 |-- HeartDisease: integer (nullable = true)



## Part05- Filtering Queries

In [None]:
### simple query
df[df['Age']>65].show(3)

+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|66.0|  M|          ASY|      140|        139|        0|    Normal|   94|             Y|    1.0|    Flat|           1|
|74.0|  M|          ATA|      145|          0|        1|        ST|  123|             N|    1.3|      Up|           1|
|68.0|  M|          ASY|      145|          0|        1|    Normal|  136|             N|    1.8|      Up|           1|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
only showing top 3 rows



In [None]:
df.filter('Age > 35').show(3)

+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|40.0|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
|49.0|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
|37.0|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
only showing top 3 rows



In [None]:
df.where('Age < 30').show(3)

+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|29.0|  M|          ATA|      120|        243|        0|    Normal|  160|             N|    0.0|      Up|           0|
|28.0|  M|          ATA|      130|        132|        0|       LVH|  185|             N|    0.0|      Up|           0|
|29.0|  M|          ATA|      140|        263|        0|    Normal|  170|             N|    0.0|      Up|           0|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
only showing top 3 rows



In [None]:
df.where((df['Age']>60) & (df['Sex']=='F')).show(3)

+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|61.0|  F|          ASY|      130|        294|        0|        ST|  120|             Y|    1.0|    Flat|           0|
|62.0|  F|           TA|      160|        193|        0|    Normal|  116|             N|    0.0|      Up|           0|
|62.0|  F|          ASY|      120|          0|        1|        ST|  123|             Y|    1.7|    Down|           1|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
only showing top 3 rows



## Part06 - Missing values handling

In [None]:
data = [
    ("James",None,"M", 1),
    ("Anna","NY","F", 0),
    ("Julia",None,None, 1),
    (None,"LA","M", None),
    (None,None,None, None),
    ("Reza",None,"M", None)
  ]
columns = ["name","state","gender", "mariage"]

In [None]:
df_temp = spark.createDataFrame(data, columns)
df_temp.show()

+-----+-----+------+-------+
| name|state|gender|mariage|
+-----+-----+------+-------+
|James| null|     M|      1|
| Anna|   NY|     F|      0|
|Julia| null|  null|      1|
| null|   LA|     M|   null|
| null| null|  null|   null|
| Reza| null|     M|   null|
+-----+-----+------+-------+



In [None]:
df_temp.na.drop().show() #defualt how='any'

+----+-----+------+-------+
|name|state|gender|mariage|
+----+-----+------+-------+
|Anna|   NY|     F|      0|
+----+-----+------+-------+



In [None]:
df_temp.na.drop(how='all').show()

+-----+-----+------+-------+
| name|state|gender|mariage|
+-----+-----+------+-------+
|James| null|     M|      1|
| Anna|   NY|     F|      0|
|Julia| null|  null|      1|
| null|   LA|     M|   null|
| Reza| null|     M|   null|
+-----+-----+------+-------+



In [None]:
df_temp.na.drop(thresh=3).show()

+-----+-----+------+-------+
| name|state|gender|mariage|
+-----+-----+------+-------+
|James| null|     M|      1|
| Anna|   NY|     F|      0|
+-----+-----+------+-------+



In [None]:
df_temp.na.fill(value='?', subset=['state', 'name']).show()

+-----+-----+------+-------+
| name|state|gender|mariage|
+-----+-----+------+-------+
|James|    ?|     M|      1|
| Anna|   NY|     F|      0|
|Julia|    ?|  null|      1|
|    ?|   LA|     M|   null|
|    ?|    ?|  null|   null|
| Reza|    ?|     M|   null|
+-----+-----+------+-------+



In [None]:
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCol='mariage',
                  outputCol='mariage').setStrategy('mode')
imputer.fit(df_temp).transform(df_temp).show()

+-----+-----+------+-------+
| name|state|gender|mariage|
+-----+-----+------+-------+
|James| null|     M|      1|
| Anna|   NY|     F|      0|
|Julia| null|  null|      1|
| null|   LA|     M|      1|
| null| null|  null|      1|
| Reza| null|     M|      1|
+-----+-----+------+-------+



## Part 07 - groupBy | orderBy

In [None]:
df.show(5)

+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|40.0|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
|49.0|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
|37.0|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
|48.0|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
|54.0|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
+----+---+-------------+---------+-----------+--

In [None]:
df.groupBy('Age').count().show(10)

+----+-----+
| Age|count|
+----+-----+
|64.0|   22|
|47.0|   19|
|58.0|   42|
|39.0|   15|
|30.0|    1|
|41.0|   24|
|29.0|    3|
|67.0|   15|
|49.0|   21|
|56.0|   38|
+----+-----+
only showing top 10 rows



In [None]:
df.groupBy('Sex').count().show()

+---+-----+
|Sex|count|
+---+-----+
|  F|  193|
|  M|  725|
+---+-----+



In [None]:
df.groupBy(['Age', 'Sex']).count().orderBy('Age').show(5)

+---+---+-----+
|Age|Sex|count|
+---+---+-----+
| 28|  M|    1|
| 29|  M|    3|
| 30|  F|    1|
| 31|  F|    1|
| 31|  M|    1|
+---+---+-----+
only showing top 5 rows



In [None]:
# pivot
df.groupBy('Age').pivot('Sex', ('M', 'F')).count().show(5)

+---+---+----+
|Age|  M|   F|
+---+---+----+
| 31|  1|   1|
| 65| 17|   4|
| 53| 27|   6|
| 34|  5|   2|
| 28|  1|null|
+---+---+----+
only showing top 5 rows



In [None]:
df.groupBy('Age').count().orderBy('count').show(10)

+----+-----+
| Age|count|
+----+-----+
|30.0|    1|
|73.0|    1|
|28.0|    1|
|77.0|    2|
|76.0|    2|
|33.0|    2|
|31.0|    2|
|29.0|    3|
|75.0|    3|
|72.0|    4|
+----+-----+
only showing top 10 rows



In [None]:
from pyspark.sql.functions import desc, asc
df.groupBy('Age').count().orderBy(asc('count')).show(10)

+----+-----+
| Age|count|
+----+-----+
|30.0|    1|
|73.0|    1|
|28.0|    1|
|77.0|    2|
|76.0|    2|
|33.0|    2|
|31.0|    2|
|29.0|    3|
|75.0|    3|
|72.0|    4|
+----+-----+
only showing top 10 rows



In [None]:
df.groupBy('Age').count().orderBy(desc('Age')).show(10)

+----+-----+
| Age|count|
+----+-----+
|77.0|    2|
|76.0|    2|
|75.0|    3|
|74.0|    7|
|73.0|    1|
|72.0|    4|
|71.0|    5|
|70.0|    7|
|69.0|   13|
|68.0|   10|
+----+-----+
only showing top 10 rows



In [None]:
from pyspark.sql.functions import min, max, avg
df.groupBy('HeartDisease').agg(max('Cholesterol')).show(5)

+------------+----------------+
|HeartDisease|max(Cholesterol)|
+------------+----------------+
|           1|             603|
|           0|             564|
+------------+----------------+



In [None]:
df.groupBy('HeartDisease').agg(avg('Cholesterol')).show(5)

+------------+------------------+
|HeartDisease|  avg(Cholesterol)|
+------------+------------------+
|           1|175.94094488188978|
|           0| 227.1219512195122|
+------------+------------------+



In [None]:
df.groupBy('HeartDisease').agg(max('Cholesterol'), avg('Cholesterol')).show(5)

+------------+----------------+------------------+
|HeartDisease|max(Cholesterol)|  avg(Cholesterol)|
+------------+----------------+------------------+
|           1|             603|175.94094488188978|
|           0|             564| 227.1219512195122|
+------------+----------------+------------------+



## Part 08 - MLlib | Regression

In [None]:
df.show(5)

+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+------------------+
| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|          feat_vec|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+------------------+
|40.0|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|[40.0,140.0,289.0]|
|49.0|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|[49.0,160.0,180.0]|
|37.0|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|[37.0,130.0,283.0]|
|48.0|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|[48.0,138.0,214.0]|
|54.0|  M|          NAP|      150|

In [None]:
X_cols = ['Age', 'RestingBP', 'Cholesterol']
target_col = ['MaxHR']
my_cols = df['Age', 'RestingBP', 'Cholesterol', 'MaxHR']

In [None]:
from pyspark.ml.feature import VectorAssembler
VA = VectorAssembler(inputCols=X_cols, outputCol='feat_vec')
df = VA.transform(df)
data = df.select(['Age', 'RestingBP', 'Cholesterol', 'feat_vec', 'MaxHR'])

In [None]:
trainset, testset = data.randomSplit([0.75, 0.25])

In [None]:
from pyspark.ml.regression import LinearRegression
LR = LinearRegression(featuresCol='feat_vec', labelCol='MaxHR')

In [None]:
# train the model
LR = LR.fit(trainset)

In [None]:
LR.coefficients

DenseVector([-0.9729, -0.0339, 0.0382])

In [None]:
pred = LR.transform(testset)

In [None]:
pred = LR.evaluate(testset)

In [None]:
pred.r2

0.19543698766778028

In [None]:
pred.meanSquaredError

536.4682135181364

In [None]:
pred.meanAbsoluteError

19.13756656234133

## Part 09 - MLlib | Classification

In [None]:
df.show(5)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
+---+---+-------------+---------+-----------+---------+-

In [None]:
df = df.drop('feat_vec')

In [None]:
df.show(5)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
+---+---+-------------+---------+-----------+---------+-

In [None]:
X_cols = ['Age', 'RestingBP', 'Cholesterol', 'MaxHR']
target_col = ['HeartDisease']
my_col = ['Age', 'RestingBP', 'Cholesterol', 'MaxHR', 'HeartDisease']

In [None]:
VA = VectorAssembler(inputCols=X_cols, outputCol='feature_vector')
df = VA.transform(df)
df.show(5)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|      feature_vector|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+--------------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|[40.0,140.0,289.0...|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|[49.0,160.0,180.0...|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|[37.0,130.0,283.0...|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|[48.0,138.0,214.0...|
| 54|  M|          NAP|    

In [None]:
data = df.select(['feature_vector', 'HeartDisease'])
data.show(5)

+--------------------+------------+
|      feature_vector|HeartDisease|
+--------------------+------------+
|[40.0,140.0,289.0...|           0|
|[49.0,160.0,180.0...|           1|
|[37.0,130.0,283.0...|           0|
|[48.0,138.0,214.0...|           1|
|[54.0,150.0,195.0...|           0|
+--------------------+------------+
only showing top 5 rows



In [None]:
trainset, testset = data.randomSplit([0.75, 0.25])

In [None]:
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression

In [None]:
RFC = RandomForestClassifier(featuresCol='feature_vector', labelCol='HeartDisease')
LRC = LogisticRegression(featuresCol='feature_vector', labelCol='HeartDisease')

In [None]:
RFC = RFC.fit(trainset)
LRC = LRC.fit(trainset)

In [None]:
rf_pred = RFC.transform(testset)
lr_pred = LRC.transform(testset)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
eval = BinaryClassificationEvaluator(labelCol='HeartDisease')

In [None]:
rf_auc = eval.evaluate(rf_pred)
lr_auc = eval.evaluate(lr_pred)

In [None]:
rf_auc

0.7837262834821432

In [None]:
lr_auc

0.7488141741071432

## Part 10 - Categorical ---> Numeric

In [None]:
df = df.drop('feature_vector')

In [None]:
df.show(2)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
only showing top 2 rows



In [None]:
from pyspark.ml.feature import StringIndexer
sex_ind = StringIndexer(inputCol='Sex', outputCol='Sex_idx')

In [None]:
sex_ind_fit = sex_ind.fit(df)

In [None]:
df_new = sex_ind_fit.transform(df)

In [None]:
df_new.show(5)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+-------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|Sex_idx|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+-------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|    0.0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|    1.0|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|    0.0|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|    1.0|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0| 

In [None]:
cpt_ind = StringIndexer(inputCol='ChestPainType', outputCol='ChestPainType_idx')

In [None]:
df_new_2 = cpt_ind.fit(df_new).transform(df_new)

In [None]:
df_new_2.select('ChestPainType', 'ChestPainType_idx').show(10)

+-------------+-----------------+
|ChestPainType|ChestPainType_idx|
+-------------+-----------------+
|          ATA|              2.0|
|          NAP|              1.0|
|          ATA|              2.0|
|          ASY|              0.0|
|          NAP|              1.0|
|          NAP|              1.0|
|          ATA|              2.0|
|          ATA|              2.0|
|          ASY|              0.0|
|          ATA|              2.0|
+-------------+-----------------+
only showing top 10 rows



## Part 11 - 13 - JOINs

In [17]:
df.show(5)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
+---+---+-------------+---------+-----------+---------+-

In [18]:
df.cache()
df.count()

918

In [19]:
from pyspark.sql.functions import monotonically_increasing_id
#add a unique index column
df = df.withColumn('index', monotonically_increasing_id())
df.show(5)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+-----+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|index|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+-----+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|    0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|    1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|    2|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|    3|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|    4|
+---+---

In [20]:
df = df.select('index', *df.columns[:-1])
df.show(5)

+-----+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|index|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+-----+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|    0| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
|    1| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
|    2| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
|    3| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
|    4| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
+-----+-

In [None]:
!gdown --id  1NJ8tAEdiC7b7iEzeBb7XiyCYNsWNBIAP
df2 = spark.read.csv('patients.csv', header=True, inferSchema=True)
df2.show(5)

In [48]:
df_inner = df.join(df2, df.index==df2.id, how='inner')

In [49]:
df_inner.show(5)

+-----+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+---+---------+---------+
|index|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease| id|FirstName| LastName|
+-----+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+---+---------+---------+
|    4| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|  4|     Paul|   Murray|
|    5| 39|  M|          NAP|      120|        339|        0|    Normal|  170|             N|    0.0|      Up|           0|  5|    Lydia|   Walker|
|    6| 45|  F|          ATA|      130|        237|        0|    Normal|  170|             N|    0.0|      Up|           0|  6|    Garry|   Harper|
|    7| 54|  M|          ATA|      110|        208|        0|    Normal|  142|             N|    0.0|      Up|  

In [50]:
df_inner.count()

914

In [51]:
df_outer = df.join(df2, df.index==df2.id, how='outer')

In [52]:
df_outer.show(5)

+-----+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+----+---------+--------+
|index|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|  id|FirstName|LastName|
+-----+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+----+---------+--------+
|    0| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|null|     null|    null|
|    1| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|null|     null|    null|
|    2| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|null|     null|    null|
|    3| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|  

In [53]:
df_outer.count()

1004

In [54]:
df_left = df.join(df2, df.index==df2.id, how='left')

In [55]:
df_left.count()

918

In [56]:
df_right = df.join(df2, df.index==df2.id, how='right')
df_right.count()

1000

## Part 14 - Builtin Functions

In [58]:
# Numeric Functions
from pyspark.sql.functions import min, max, avg

In [61]:
df.select(min('Cholesterol'), max('Cholesterol'), avg('Cholesterol')).show()

+----------------+----------------+-----------------+
|min(Cholesterol)|max(Cholesterol)| avg(Cholesterol)|
+----------------+----------------+-----------------+
|               0|             603|198.7995642701525|
+----------------+----------------+-----------------+



In [62]:
#String Functions
from pyspark.sql.functions import lower, upper, substring

In [67]:
df.select(lower('ChestPainType'), upper('ChestPainType'), substring('ChestPainType', 2, 2)).show(5)

+--------------------+--------------------+------------------------------+
|lower(ChestPainType)|upper(ChestPainType)|substring(ChestPainType, 2, 2)|
+--------------------+--------------------+------------------------------+
|                 ata|                 ATA|                            TA|
|                 nap|                 NAP|                            AP|
|                 ata|                 ATA|                            TA|
|                 asy|                 ASY|                            SY|
|                 nap|                 NAP|                            AP|
+--------------------+--------------------+------------------------------+
only showing top 5 rows



In [69]:
from pyspark.sql.functions import expr

In [70]:
df = df.withColumn('date', expr('date_add(current_timestamp(), cast(rand()*365 as int))'))
df.show(5)

+-----+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+----------+
|index|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|      date|
+-----+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+----------+
|    0| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|2024-01-23|
|    1| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|2023-11-20|
|    2| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|2024-01-12|
|    3| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|2023-08-06|
|    4| 54|  M|          NAP|      150|        195|    

In [75]:
from pyspark.sql.functions import date_add, date_sub, datediff, date_format

In [74]:
df.select(min('date'), max('date'), datediff(max('date'), min('date')),
          date_sub(min('date'), 10), date_add(max('date'), 10)).show()

+----------+----------+------------------------------+-----------------------+-----------------------+
| min(date)| max(date)|datediff(max(date), min(date))|date_sub(min(date), 10)|date_add(max(date), 10)|
+----------+----------+------------------------------+-----------------------+-----------------------+
|2023-05-11|2024-05-09|                           364|             2023-05-01|             2024-05-19|
+----------+----------+------------------------------+-----------------------+-----------------------+



In [76]:
df.select(date_format('date', 'dd/MMM/yyyy')).show(5)

+------------------------------+
|date_format(date, dd/MMM/yyyy)|
+------------------------------+
|                   23/Jan/2024|
|                   20/Nov/2023|
|                   12/Jan/2024|
|                   06/Aug/2023|
|                   03/May/2024|
+------------------------------+
only showing top 5 rows



## Part 15 - RDD