# Faculty Noteebook

## Read data

In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import isnull, when, count, col,avg
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
 
sc = SparkContext.getOrCreate()
spark = SparkSession(sc) 

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/07 17:01:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/07 17:01:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/08/07 17:01:34 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/08/07 17:01:34 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
22/08/07 17:01:34 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
22/08/07 17:01:34 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.


In [2]:
file_location = "HeartStroke.csv"
file_type = "csv"

# CSV options
infer_schema = True
first_row_is_header = True
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
rawstrokeDF = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

[Stage 1:>                                                          (0 + 1) / 1]                                                                                

In [3]:
rawstrokeDF.show(5, False)

+------+----+--------+------------+------+-------------+---------------+-----+
|gender|age |diabetes|hypertension|stroke|heart_disease|smoking_history|BMI  |
+------+----+--------+------------+------+-------------+---------------+-----+
|Female|80.0|0       |0           |No    |Yes          |never          |25.19|
|Female|54.0|0       |0           |No    |No           |null           |null |
|Male  |28.0|0       |0           |No    |No           |never          |null |
|Female|36.0|0       |0           |No    |No           |current        |23.45|
|Male  |76.0|0       |1           |No    |Yes          |current        |20.14|
+------+----+--------+------------+------+-------------+---------------+-----+
only showing top 5 rows



## Check missing values
(For this exercise we will drop all the na values)

In [4]:
from pyspark.sql.functions import isnull, when, count, col
rawstrokeDF.select([count(when(isnull(c), c)).alias(c) for c in rawstrokeDF.columns]).show()

+------+---+--------+------------+------+-------------+---------------+-----+
|gender|age|diabetes|hypertension|stroke|heart_disease|smoking_history|  BMI|
+------+---+--------+------------+------+-------------+---------------+-----+
|     0|  0|       0|           0|     0|            0|          35816|25444|
+------+---+--------+------------+------+-------------+---------------+-----+



The columns 'smoking history' and 'BMI' has missing values. Let's drop them

In [5]:
# Use df_name.na.drop() to drop all the null values from the dataframe
rawstrokeDF = rawstrokeDF.na.drop()

In [6]:
# Check if the null value still exist
from pyspark.sql.functions import isnull, when, count, col
rawstrokeDF.select([count(when(isnull(c), c)).alias(c) for c in rawstrokeDF.columns]).show()

+------+---+--------+------------+------+-------------+---------------+---+
|gender|age|diabetes|hypertension|stroke|heart_disease|smoking_history|BMI|
+------+---+--------+------------+------+-------------+---------------+---+
|     0|  0|       0|           0|     0|            0|              0|  0|
+------+---+--------+------------+------+-------------+---------------+---+



In [7]:
rawstrokeDF.show(5, False)

+------+----+--------+------------+------+-------------+---------------+-----+
|gender|age |diabetes|hypertension|stroke|heart_disease|smoking_history|BMI  |
+------+----+--------+------------+------+-------------+---------------+-----+
|Female|80.0|0       |0           |No    |Yes          |never          |25.19|
|Female|36.0|0       |0           |No    |No           |current        |23.45|
|Male  |76.0|0       |1           |No    |Yes          |current        |20.14|
|Female|44.0|1       |0           |No    |No           |never          |19.31|
|Male  |42.0|0       |0           |No    |No           |never          |33.64|
+------+----+--------+------------+------+-------------+---------------+-----+
only showing top 5 rows



## Check and confirm the data type of each column

In [8]:
rawstrokeDF.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- stroke: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- BMI: double (nullable = true)



The variable values for any supervised ML algorithm has to be of type double. Let us convert the columns "diabetes", "hypertension" and target varaible "stroke" data type into type double

In [9]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col
intcols = ["diabetes", "hypertension"]

for col_name in intcols:
    strokeDF = rawstrokeDF.withColumn(col_name, col(col_name).cast(DoubleType()))

# We will also rename the column name 'stroke' as 'label' for 

strokeDF.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- hypertension: double (nullable = true)
 |-- stroke: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- BMI: double (nullable = true)



## Transformations

#### Binarizer

Lets use divide the BMi into two groups: Obese and healthy. 1 represents 'obese' and 0 represents 'healthy' (If your BMI is 30.0 or higher, it falls within the obese range)
We will use the Binarizer transformer to create a new variable 'Body Type' (1- obese and 0- healthy) by binarizing the 'BMI' variable by setting the obesity threshold value 30.0. Binarization is used for thresholding numerical feature to binary feature (0 or 1)

In [10]:
from pyspark.ml.feature import Binarizer
binarizer = Binarizer(inputCol="BMI", outputCol="BodyType", threshold=30.0)
binarizedDF = binarizer.transform(strokeDF)
binarizedDF.select('BMI', 'BodyType').show(5,False)

+-----+--------+
|BMI  |BodyType|
+-----+--------+
|25.19|0.0     |
|23.45|0.0     |
|20.14|0.0     |
|19.31|0.0     |
|33.64|1.0     |
+-----+--------+
only showing top 5 rows



In [11]:
binarizedDF.show(5)

+------+----+--------+------------+------+-------------+---------------+-----+--------+
|gender| age|diabetes|hypertension|stroke|heart_disease|smoking_history|  BMI|BodyType|
+------+----+--------+------------+------+-------------+---------------+-----+--------+
|Female|80.0|       0|         0.0|    No|          Yes|          never|25.19|     0.0|
|Female|36.0|       0|         0.0|    No|           No|        current|23.45|     0.0|
|  Male|76.0|       0|         1.0|    No|          Yes|        current|20.14|     0.0|
|Female|44.0|       1|         0.0|    No|           No|          never|19.31|     0.0|
|  Male|42.0|       0|         0.0|    No|           No|          never|33.64|     1.0|
+------+----+--------+------------+------+-------------+---------------+-----+--------+
only showing top 5 rows



From the above result we can see that the value of the target feature label is now converted to binary values

#### Bucketizer

We now group the patients based on their age group. Here, we will use the Bucketizer transformer. Bucketizer is used for creating group of values of a continuous feature

In [12]:
from pyspark.ml.feature import Bucketizer
# lets define the age age group splits
splits = [0, 25.0, 50.0, 75.0, 100.0]
bucketizer = Bucketizer(inputCol="age", outputCol="ageGroup", splits=splits)
bucketizedDF = bucketizer.transform(binarizedDF)
bucketizedDF.select('age', 'ageGroup').show(10,False)

+----+--------+
|age |ageGroup|
+----+--------+
|80.0|3.0     |
|36.0|1.0     |
|76.0|3.0     |
|44.0|1.0     |
|42.0|1.0     |
|54.0|2.0     |
|78.0|3.0     |
|67.0|2.0     |
|15.0|0.0     |
|42.0|1.0     |
+----+--------+
only showing top 10 rows



In [13]:
bucketizedDF.columns

['gender',
 'age',
 'diabetes',
 'hypertension',
 'stroke',
 'heart_disease',
 'smoking_history',
 'BMI',
 'BodyType',
 'ageGroup']

In [14]:
bucketizedDF.select('age', 'ageGroup').show(10,False)

+----+--------+
|age |ageGroup|
+----+--------+
|80.0|3.0     |
|36.0|1.0     |
|76.0|3.0     |
|44.0|1.0     |
|42.0|1.0     |
|54.0|2.0     |
|78.0|3.0     |
|67.0|2.0     |
|15.0|0.0     |
|42.0|1.0     |
+----+--------+
only showing top 10 rows



#### StringIndexer

There are three categorical variables in our dataset viz., 'gender', 'heart disease' and 'smoking history'. These variables cannot be directly passed to our ML algorithms. We will converet them into indexes and to do that we will use StringIndexer transformer. StringIndexer converts a string column to an index column. The most frequent label gets index 0

In [15]:
from pyspark.ml.feature import StringIndexer
indexers = StringIndexer(inputCols= ['stroke','gender', 'heart_disease', 'smoking_history'], 
                         outputCols=['label', 'gender_indexed', 'heart_disease_indexed', 'smoking_history_indexed'])
strindexedDF = indexers.fit(bucketizedDF).transform(bucketizedDF)
strindexedDF.select('stroke', 'label', 'gender', 'gender_indexed', 'heart_disease', 'heart_disease_indexed', 
                    'smoking_history', 'smoking_history_indexed').show(5, False)

+------+-----+------+--------------+-------------+---------------------+---------------+-----------------------+
|stroke|label|gender|gender_indexed|heart_disease|heart_disease_indexed|smoking_history|smoking_history_indexed|
+------+-----+------+--------------+-------------+---------------------+---------------+-----------------------+
|No    |0.0  |Female|0.0           |Yes          |1.0                  |never          |0.0                    |
|No    |0.0  |Female|0.0           |No           |0.0                  |current        |2.0                    |
|No    |0.0  |Male  |1.0           |Yes          |1.0                  |current        |2.0                    |
|No    |0.0  |Female|0.0           |No           |0.0                  |never          |0.0                    |
|No    |0.0  |Male  |1.0           |No           |0.0                  |never          |0.0                    |
+------+-----+------+--------------+-------------+---------------------+---------------+--------

From the above output you can observe that the categorical columns has got converted to their respective indices columns. The most frequent label gets index 0 and so on ordered by label frequencies

#### OneHotEncoderEstimator

In [16]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols= ["gender_indexed", 'heart_disease_indexed', 'smoking_history_indexed'], 
                         outputCols=["genderVec", 'heart_diseaseVec', 'smoking_historyVec'])
encodedDF = encoder.fit(strindexedDF).transform(strindexedDF)
encodedDF.select('gender_indexed', 'genderVec', 'heart_disease_indexed', 'heart_diseaseVec', 
                    'smoking_history_indexed', 'smoking_historyVec',).show(5, False)

+--------------+-------------+---------------------+----------------+-----------------------+------------------+
|gender_indexed|genderVec    |heart_disease_indexed|heart_diseaseVec|smoking_history_indexed|smoking_historyVec|
+--------------+-------------+---------------------+----------------+-----------------------+------------------+
|0.0           |(2,[0],[1.0])|1.0                  |(1,[],[])       |0.0                    |(4,[0],[1.0])     |
|0.0           |(2,[0],[1.0])|0.0                  |(1,[0],[1.0])   |2.0                    |(4,[2],[1.0])     |
|1.0           |(2,[1],[1.0])|1.0                  |(1,[],[])       |2.0                    |(4,[2],[1.0])     |
|0.0           |(2,[0],[1.0])|0.0                  |(1,[0],[1.0])   |0.0                    |(4,[0],[1.0])     |
|1.0           |(2,[1],[1.0])|0.0                  |(1,[0],[1.0])   |0.0                    |(4,[0],[1.0])     |
+--------------+-------------+---------------------+----------------+-----------------------+---

In [17]:
encodedDF.show(5)

+------+----+--------+------------+------+-------------+---------------+-----+--------+--------+-----+--------------+---------------------+-----------------------+-------------+----------------+------------------+
|gender| age|diabetes|hypertension|stroke|heart_disease|smoking_history|  BMI|BodyType|ageGroup|label|gender_indexed|heart_disease_indexed|smoking_history_indexed|    genderVec|heart_diseaseVec|smoking_historyVec|
+------+----+--------+------------+------+-------------+---------------+-----+--------+--------+-----+--------------+---------------------+-----------------------+-------------+----------------+------------------+
|Female|80.0|       0|         0.0|    No|          Yes|          never|25.19|     0.0|     3.0|  0.0|           0.0|                  1.0|                    0.0|(2,[0],[1.0])|       (1,[],[])|     (4,[0],[1.0])|
|Female|36.0|       0|         0.0|    No|           No|        current|23.45|     0.0|     1.0|  0.0|           0.0|                  0.0|     

### VectorAssembler
MLlib expects all features to be contained within a single column. VectorAssembler combines multiple columns and gives single column as output

In [18]:
# Import VectorAssembler from pyspark.ml.feature package
from pyspark.ml.feature import VectorAssembler
# Create a list of all the variables that you want to create feature vectors
# These features are then further used for training model
features_col = ["age", "diabetes", "hypertension", "BMI", "BodyType", "ageGroup", 
                "genderVec","heart_diseaseVec","smoking_historyVec"]
# Create the VectorAssembler object
assembler = VectorAssembler(inputCols= features_col, outputCol= "features")
assembledDF = assembler.transform(encodedDF)
assembledDF.select("features").show(5, False)

+-----------------------------------------------------+
|features                                             |
+-----------------------------------------------------+
|(13,[0,3,5,6,9],[80.0,25.19,3.0,1.0,1.0])            |
|(13,[0,3,5,6,8,11],[36.0,23.45,1.0,1.0,1.0,1.0])     |
|(13,[0,2,3,5,7,11],[76.0,1.0,20.14,3.0,1.0,1.0])     |
|(13,[0,1,3,5,6,8,9],[44.0,1.0,19.31,1.0,1.0,1.0,1.0])|
|(13,[0,3,4,5,7,8,9],[42.0,33.64,1.0,1.0,1.0,1.0,1.0])|
+-----------------------------------------------------+
only showing top 5 rows



Now that all the features are vectorized. Let us convert the target variable 'stroke' (integer) in to a label column

All the features are now in one single feature vector. If you notice, the feature column contains sparse vector. In order to perform scaling on the data, we must convert the sparse vector to dense vector

In [19]:
assembledDF.columns

['gender',
 'age',
 'diabetes',
 'hypertension',
 'stroke',
 'heart_disease',
 'smoking_history',
 'BMI',
 'BodyType',
 'ageGroup',
 'label',
 'gender_indexed',
 'heart_disease_indexed',
 'smoking_history_indexed',
 'genderVec',
 'heart_diseaseVec',
 'smoking_historyVec',
 'features']

### VectorIndexer
VectorIndexer automatically identifies the categorical features from the feature vector (output from VectorAssembler). It then indexes categorical features inside of a Vector
It is the vectorized version of StringIndexer

In [20]:
# Import VectorIndexer from pyspark.ml.feature package
from pyspark.ml.feature import VectorIndexer
# Create a list of all the raw features
# VectorIndexer will automatically identify the categorical columns and index them
featurecol = ['age', 'diabetes','hypertension', 'BMI','BodyType','ageGroup', 
              "gender_indexed", 'heart_disease_indexed', 'smoking_history_indexed']

# Create the VectorAssembler object
assembler = VectorAssembler(inputCols= featurecol, outputCol= "features")
assembledDF = assembler.transform(strindexedDF)

# Create the VectorIndexer object. It only take feature column
vecindexer = VectorIndexer(inputCol= "features", outputCol= "indexed_features")
# Fit the vectorindexer object on the output of the vectorassembler data and transform
vecindexedDF = vecindexer.fit(assembledDF).transform(assembledDF)
vecindexedDF.select("features", "indexed_features").show(5, False)

+----------------------------------------+----------------------------------------+
|features                                |indexed_features                        |
+----------------------------------------+----------------------------------------+
|(9,[0,3,5,7],[80.0,25.19,3.0,1.0])      |(9,[0,3,5,7],[80.0,25.19,3.0,1.0])      |
|(9,[0,3,5,8],[36.0,23.45,1.0,2.0])      |(9,[0,3,5,8],[36.0,23.45,1.0,2.0])      |
|[76.0,0.0,1.0,20.14,0.0,3.0,1.0,1.0,2.0]|[76.0,0.0,1.0,20.14,0.0,3.0,1.0,1.0,2.0]|
|(9,[0,1,3,5],[44.0,1.0,19.31,1.0])      |(9,[0,1,3,5],[44.0,1.0,19.31,1.0])      |
|[42.0,0.0,0.0,33.64,1.0,1.0,1.0,0.0,0.0]|[42.0,0.0,0.0,33.64,1.0,1.0,1.0,0.0,0.0]|
+----------------------------------------+----------------------------------------+
only showing top 5 rows



VectorIndexer let us skip the one hot encoding stage for encoding the categorical features. As discussed earlier, we should not use one hot encoding on categorical variables for algorithms like decision tree and tree ensembles. VectorIndexer are chosen over OneHotEncoderEstimator in such scenario which allows these algorithms to treat categorical features appropriately

### StandardScaler
StandardScaler scales each value in the feature vector such that the mean is 0 and the standard deviation is 1
<br>It takes parameters:
<br>withStd: True by default. Scales the data to unit standard deviation
<br>withMean: False by default. Centers the data with mean before scaling

**If you notice the output of vectorassembler data is a sparse vector. This need to be converted into a dense vector befor applying scaling**

In [21]:
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors, VectorUDT

# Define a udf that converts sparse vector into dense vector
# You cannot create your own custom function and run that against the data directly. 
# In Spark, You have to register the function first using udf function
sparseToDense = F.udf(lambda v : Vectors.dense(v), VectorUDT())

# We then call the function here passing the column name on which the function has to be applied
densefeatureDF = assembledDF.withColumn('features_array', sparseToDense('features'))

densefeatureDF.select("features", "features_array").show(5, False)

[Stage 23:>                                                         (0 + 1) / 1]

+----------------------------------------+----------------------------------------+
|features                                |features_array                          |
+----------------------------------------+----------------------------------------+
|(9,[0,3,5,7],[80.0,25.19,3.0,1.0])      |[80.0,0.0,0.0,25.19,0.0,3.0,0.0,1.0,0.0]|
|(9,[0,3,5,8],[36.0,23.45,1.0,2.0])      |[36.0,0.0,0.0,23.45,0.0,1.0,0.0,0.0,2.0]|
|[76.0,0.0,1.0,20.14,0.0,3.0,1.0,1.0,2.0]|[76.0,0.0,1.0,20.14,0.0,3.0,1.0,1.0,2.0]|
|(9,[0,1,3,5],[44.0,1.0,19.31,1.0])      |[44.0,1.0,0.0,19.31,0.0,1.0,0.0,0.0,0.0]|
|[42.0,0.0,0.0,33.64,1.0,1.0,1.0,0.0,0.0]|[42.0,0.0,0.0,33.64,1.0,1.0,1.0,0.0,0.0]|
+----------------------------------------+----------------------------------------+
only showing top 5 rows



                                                                                

In [22]:
densefeatureDF.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- hypertension: double (nullable = true)
 |-- stroke: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- BMI: double (nullable = true)
 |-- BodyType: double (nullable = true)
 |-- ageGroup: double (nullable = true)
 |-- label: double (nullable = false)
 |-- gender_indexed: double (nullable = false)
 |-- heart_disease_indexed: double (nullable = false)
 |-- smoking_history_indexed: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- features_array: vector (nullable = true)



In [23]:
# Import StandardScaler from pyspark.ml.feature package
from pyspark.ml.feature import StandardScaler

# Create the StandardScaler object. It only take feature column (dense vector)
stdscaler = StandardScaler(inputCol= "features_array", outputCol= "scaledfeatures")

# Fit the StandardScaler object on the output of the dense vector data and transform
stdscaledDF = stdscaler.fit(densefeatureDF).transform(densefeatureDF)
stdscaledDF.select("scaledfeatures" ).show(5, False)

                                                                                

+-----------------------------------------------------------------------------------------------------------------------------------------+
|scaledfeatures                                                                                                                           |
+-----------------------------------------------------------------------------------------------------------------------------------------+
|[4.087032486459775,0.0,0.0,3.4969597048301693,0.0,3.465141331509375,0.0,4.588062443054963,0.0]                                           |
|[1.839164618906899,0.0,0.0,3.255407109101527,0.0,1.155047110503125,0.0,0.0,1.5657422910924597]                                           |
|[3.882680862136787,0.0,3.266884631584384,2.795901883893593,0.0,3.465141331509375,2.0514943144150433,4.588062443054963,1.5657422910924597]|
|[2.2478678675528765,3.122394188242338,0.0,2.680678519264413,0.0,1.155047110503125,0.0,0.0,0.0]                                           |
|[2.145692055391382,

From the above output we can see that the features are scaled to unit standard deviation.

In [24]:
stdscaledDF.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- hypertension: double (nullable = true)
 |-- stroke: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- BMI: double (nullable = true)
 |-- BodyType: double (nullable = true)
 |-- ageGroup: double (nullable = true)
 |-- label: double (nullable = false)
 |-- gender_indexed: double (nullable = false)
 |-- heart_disease_indexed: double (nullable = false)
 |-- smoking_history_indexed: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- features_array: vector (nullable = true)
 |-- scaledfeatures: vector (nullable = true)



### MinMaxScaler
For MinMaxScaler you dont need to

In [25]:
# Import MinMaxScaler from pyspark.ml.feature package
from pyspark.ml.feature import MinMaxScaler

# Create the MinMaxScaler object. It only take feature column (dense vector)
mmxscaler = MinMaxScaler(inputCol= "features_array", outputCol= "mmxscaledfeatures", )

# Fit the MinMaxScaler object on the output of the dense vector data and transform
mmxscaledDF = mmxscaler.fit(densefeatureDF).transform(densefeatureDF)
mmxscaledDF.select("mmxscaledfeatures" ).show(5, False)

                                                                                

+-----------------------------------------------------------------------------------+
|mmxscaledfeatures                                                                  |
+-----------------------------------------------------------------------------------+
|(9,[0,3,5,7],[0.9999999999999999,0.18485441644237832,1.0,1.0])                     |
|(9,[0,3,5,8],[0.44889779559118237,0.1635674088573526,0.3333333333333333,0.5])      |
|[0.9498997995991983,0.0,1.0,0.12307315879618304,0.0,1.0,0.5,1.0,0.5]               |
|(9,[0,1,3,5],[0.5490981963927856,1.0,0.11291901149987765,0.3333333333333333])      |
|[0.5240480961923848,0.0,0.0,0.28823097626620997,1.0,0.3333333333333333,0.5,0.0,0.0]|
+-----------------------------------------------------------------------------------+
only showing top 5 rows



### Normalizer

In [26]:
# Import norm from pyspark.ml.feature package
from pyspark.ml.feature import Normalizer

# Create the norm object. It only take feature column (dense vector)
normscaler = Normalizer(inputCol= "features_array", outputCol= "normscaledfeatures")

# Fit the norm object on the output of the dense vector data and transform
normscaledDF = normscaler.transform(densefeatureDF)
normscaledDF.select("normscaledfeatures" ).show(5, False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|normscaledfeatures                                                                                                                                     |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|[0.9531555641945476,0.0,0.0,0.3001248582757582,0.0,0.03574333365729553,0.0,0.011914444552431845,0.0]                                                   |
|[0.8367789176331486,0.0,0.0,0.5450684894027037,0.0,0.023243858823143018,0.0,0.0,0.046487717646286036]                                                  |
|[0.9653863060221477,0.0,0.012702451395028258,0.2558273710958691,0.0,0.03810735418508478,0.012702451395028258,0.012702451395028258,0.025404902790056517]|
|[0.9153018623338187,0.020802315053041334,0.0,0.4016927036742281,0.0,0.02080

In [27]:
normscaledDF.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- hypertension: double (nullable = true)
 |-- stroke: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- BMI: double (nullable = true)
 |-- BodyType: double (nullable = true)
 |-- ageGroup: double (nullable = true)
 |-- label: double (nullable = false)
 |-- gender_indexed: double (nullable = false)
 |-- heart_disease_indexed: double (nullable = false)
 |-- smoking_history_indexed: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- features_array: vector (nullable = true)
 |-- normscaledfeatures: vector (nullable = true)



##### You can use any of the above scaled data for training your model

## Spark ML Algorithms
We will now train the ML models with the data that we have worked upon so far. We will build classification model since, given the data, we need to determine if a person will get a stroke or not

### Train-Test Split
We split the output of  data into training and test sets (30% held out for testing)
Note: This train-test split of for logistic regression

In [28]:
# We spilt the data into 70-30 set
# Training Set - 70% obesevations
# Testing Set - 30% observations
trainDF, testDF =  assembledDF.randomSplit([0.7,0.3], seed = 2020)

# print the count of observations in each set
print("Observations in training set = ", trainDF.count())
print("Observations in testing set = ", testDF.count())

Observations in training set =  36680
Observations in testing set =  15495


### Supervised Learning - Classification 
#### Logistic Regression

In [29]:
# import the LogisticRegression function from the pyspark.ml.classification package
from pyspark.ml.classification import LogisticRegression

# Build the LogisticRegression object 'lr' by setting the required parameters
lr = LogisticRegression(featuresCol="features", labelCol="label",maxIter= 10,regParam=0.3, elasticNetParam=0.8)

# fit the LogisticRegression object on the training data
lrmodel = lr.fit(trainDF)

#This LogisticRegressionModel can be used as a transformer to perform prediction on the testing data
predictonDF = lrmodel.transform(testDF)

predictonDF.select("label","rawPrediction", "probability", "prediction").show(10,False)

22/08/07 17:01:54 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
22/08/07 17:01:54 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/08/07 17:01:54 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
+-----+--------------------------------------+-----------------------------------------+----------+
|label|rawPrediction                         |probability                              |prediction|
+-----+--------------------------------------+-----------------------------------------+----------+
|0.0  |[4.026156743176436,-4.026156743176436]|[0.9824700109051254,0.017529989094874576]|0.0       |
|0.0  |[4.026156743176436,-4.026156743176436]|[0.9824700109051254,0.017529989094874576]|0.0       |
|0.0  |[4.026156743176436,-4.026156743176436]|[0.9824700109051254,0.017529989094874576]|0.0       |
|0.0  |[4.026156743176436,-4.026156

In [30]:
predictonDF.select("label","rawPrediction", "probability", "prediction").show(50,False)

+-----+--------------------------------------+-----------------------------------------+----------+
|label|rawPrediction                         |probability                              |prediction|
+-----+--------------------------------------+-----------------------------------------+----------+
|0.0  |[4.026156743176436,-4.026156743176436]|[0.9824700109051254,0.017529989094874576]|0.0       |
|0.0  |[4.026156743176436,-4.026156743176436]|[0.9824700109051254,0.017529989094874576]|0.0       |
|0.0  |[4.026156743176436,-4.026156743176436]|[0.9824700109051254,0.017529989094874576]|0.0       |
|0.0  |[4.026156743176436,-4.026156743176436]|[0.9824700109051254,0.017529989094874576]|0.0       |
|0.0  |[4.026156743176436,-4.026156743176436]|[0.9824700109051254,0.017529989094874576]|0.0       |
|0.0  |[4.026156743176436,-4.026156743176436]|[0.9824700109051254,0.017529989094874576]|0.0       |
|0.0  |[4.026156743176436,-4.026156743176436]|[0.9824700109051254,0.017529989094874576]|0.0       |


##### Model Evaluation

In [31]:
# import BinaryClassificationEvaluator from the pyspark.ml.evaluation package
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Build the BinaryClassificationEvaluator object 'evaluator'
evaluator = BinaryClassificationEvaluator()

# Calculate the accracy and print its value
accuracy = predictonDF.filter(predictonDF.label == predictonDF.prediction).count()/float(predictonDF.count())
print("Accuracy = ", accuracy)

# evaluate(predictiondataframe) gets area under the ROC curve
print('Area under the ROC curve = ', evaluator.evaluate(predictonDF))

Accuracy =  0.98134882220071
Area under the ROC curve =  0.5


[Stage 49:>                                                         (0 + 1) / 1]                                                                                

The accuracy of our model is 98.13 % and the area under the ROC curve is 0.5. You can also find these metrics using the model summary as below

In [32]:
# Create model summary object
lrmodelSummary = lrmodel.summary

# Print the following metrics one by one: 
# 1. Accuracy
# Accuracy is a model summary parameter
print("Accuracy = ", lrmodelSummary.accuracy)
# 2. Area under the ROC curve
# Area under the ROC curve is a model summary parameter
print("Area under the ROC curve = ", lrmodelSummary.areaUnderROC)
# 3. Precision (Positive Predictive Value)
# Precision is a model summary parameter
print("Precision = ", lrmodelSummary.weightedPrecision)
# 4. Recall (True Positive Rate)
# Recall is a model summary parameter
print("Recall = ", lrmodelSummary.weightedRecall)
# 5. F1 Score (F-measure)
# F1 Score is a model summary method
print("F1 Score = ", lrmodelSummary.weightedFMeasure())


Accuracy =  0.9824700109051254
Area under the ROC curve =  0.5
Precision =  0.9652473223279173
Recall =  0.9824700109051254
F1 Score =  0.973782520813235


#### Decision Tree

In [33]:
# We spilt the data into 70-30 set
# Training Set - 70% obesevations
# Testing Set - 30% observations
trainDF, testDF =  vecindexedDF.randomSplit([0.7,0.3], seed = 2020)

# print the count of observations in each set
print("Observations in training set = ", trainDF.count())
print("Observations in testing set = ", testDF.count())

Observations in training set =  36680
Observations in testing set =  15495


In [34]:
# import the DecisionTree function from the pyspark.ml.classification package
from pyspark.ml.classification import DecisionTreeClassifier

# Build the DecisionTree object 'dt' by setting the required parameters
# We will pass the VectorIndexed columns as featureCol for Decision Tree. Since they can handle categorical indexes
dt = DecisionTreeClassifier(featuresCol="indexed_features", labelCol="label",maxDepth= 10)

# fit the DecisionTree object on the training data
dtmodel = dt.fit(trainDF)

#This DecisionTreeModel can be used as a transformer to perform prediction on the testing data
dtpredictonDF = dtmodel.transform(testDF)

dtpredictonDF.select("label","rawPrediction", "probability", "prediction").show(10,False)

+-----+-------------+-----------------------------------------+----------+
|label|rawPrediction|probability                              |prediction|
+-----+-------------+-----------------------------------------+----------+
|0.0  |[9916.0,4.0] |[0.9995967741935484,4.032258064516129E-4]|0.0       |
|0.0  |[9916.0,4.0] |[0.9995967741935484,4.032258064516129E-4]|0.0       |
|0.0  |[9916.0,4.0] |[0.9995967741935484,4.032258064516129E-4]|0.0       |
|0.0  |[9916.0,4.0] |[0.9995967741935484,4.032258064516129E-4]|0.0       |
|0.0  |[9916.0,4.0] |[0.9995967741935484,4.032258064516129E-4]|0.0       |
|0.0  |[9916.0,4.0] |[0.9995967741935484,4.032258064516129E-4]|0.0       |
|0.0  |[9916.0,4.0] |[0.9995967741935484,4.032258064516129E-4]|0.0       |
|0.0  |[9916.0,4.0] |[0.9995967741935484,4.032258064516129E-4]|0.0       |
|0.0  |[9916.0,4.0] |[0.9995967741935484,4.032258064516129E-4]|0.0       |
|0.0  |[9916.0,4.0] |[0.9995967741935484,4.032258064516129E-4]|0.0       |
+-----+-------------+----

##### Model Evaluation

In [35]:
# import BinaryClassificationEvaluator from the pyspark.ml.evaluation package
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Build the BinaryClassificationEvaluator object 'evaluator'
evaluator = BinaryClassificationEvaluator()

# Calculate the accracy and print its value
accuracy = dtpredictonDF.filter(dtpredictonDF.label == dtpredictonDF.prediction).count()/float(dtpredictonDF.count())
print("Accuracy = ", accuracy)

# evaluate(predictiondataframe) gets area under the ROC curve
print('Area under the ROC curve = ', evaluator.evaluate(dtpredictonDF))

[Stage 100:>                                                        (0 + 1) / 1]                                                                                

Accuracy =  0.9780574378831881
Area under the ROC curve =  0.5421588500623729


In [36]:
# import MulticlassClassificationEvaluator from the pyspark.ml.evaluation package
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Build the MulticlassClassificationEvaluator object 'evaluator'
multievaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# 1. Accuracy
print("Accuracy: ", multievaluator.evaluate(dtpredictonDF, {evaluator.metricName: "accuracy"})) 
# 2. Area under the ROC curve
print('Area under the ROC curve = ', evaluator.evaluate(dtpredictonDF))
# 3. Precision (Positive Predictive Value)
print("Precision = ", multievaluator.evaluate(dtpredictonDF, {evaluator.metricName: "weightedPrecision"}))
# 4. Recall (True Positive Rate)
print("Recall = ", multievaluator.evaluate(dtpredictonDF, {evaluator.metricName: "weightedRecall"}))
# 5. F1 Score (F-measure)
print("F1 Score = ", multievaluator.evaluate(dtpredictonDF, {evaluator.metricName: "f1"}))

Accuracy:  0.970888654696996
Area under the ROC curve =  0.5421588500623729
Precision =  0.970888654696996
Recall =  0.970888654696996
F1 Score =  0.970888654696996


#### Random Forest

In [37]:
# import the RandomForestClassifier function from the pyspark.ml.classification package
from pyspark.ml.classification import RandomForestClassifier

# Build the RandomForestClassifier object 'dt' by setting the required parameters
# We will pass the VectorIndexed columns as featureCol for Random Forest. Since they can handle categorical indexes
rf = RandomForestClassifier(featuresCol="indexed_features", labelCol="label")

# fit the RandomForestClassifier object on the training data
rfmodel = rf.fit(trainDF)

#This RandomForestClassifierModel can be used as a transformer to perform prediction on the testing data
rfpredictonDF = rfmodel.transform(testDF)

rfpredictonDF.select("label","rawPrediction", "probability", "prediction").show(10,False)

[Stage 136:>                                                        (0 + 1) / 1]                                                                                

+-----+---------------------------------------+----------------------------------------+----------+
|label|rawPrediction                          |probability                             |prediction|
+-----+---------------------------------------+----------------------------------------+----------+
|0.0  |[19.704623707682273,0.2953762923177289]|[0.9852311853841135,0.01476881461588644]|0.0       |
|0.0  |[19.704623707682273,0.2953762923177289]|[0.9852311853841135,0.01476881461588644]|0.0       |
|0.0  |[19.704623707682273,0.2953762923177289]|[0.9852311853841135,0.01476881461588644]|0.0       |
|0.0  |[19.704623707682273,0.2953762923177289]|[0.9852311853841135,0.01476881461588644]|0.0       |
|0.0  |[19.704623707682273,0.2953762923177289]|[0.9852311853841135,0.01476881461588644]|0.0       |
|0.0  |[19.704623707682273,0.2953762923177289]|[0.9852311853841135,0.01476881461588644]|0.0       |
|0.0  |[19.704623707682273,0.2953762923177289]|[0.9852311853841135,0.01476881461588644]|0.0       |


##### Model Evaluation

In [38]:
# 1. Accuracy
print("Accuracy: ", multievaluator.evaluate(rfpredictonDF, {evaluator.metricName: "accuracy"})) 
# 2. Area under the ROC curve
print('Area under the ROC curve = ', evaluator.evaluate(rfpredictonDF))
# 3. Precision (Positive Predictive Value)
print("Precision = ", multievaluator.evaluate(rfpredictonDF, {evaluator.metricName: "weightedPrecision"}))
# 4. Recall (True Positive Rate)
print("Recall = ", multievaluator.evaluate(rfpredictonDF, {evaluator.metricName: "weightedRecall"}))
# 5. F1 Score (F-measure)
print("F1 Score = ", multievaluator.evaluate(rfpredictonDF, {evaluator.metricName: "f1"}))

Accuracy:  0.9721110185586135
Area under the ROC curve =  0.8166641787274829
Precision =  0.9721110185586135
Recall =  0.9721110185586135
F1 Score =  0.9721110185586135


### Building Pipeline

We will split the stroke df into train and test split and pass the training set to the pipeline

In [39]:
# We spilt the data into 70-30 set
# Training Set - 70% obesevations
# Testing Set - 30% observations
trainDF, testDF =  strokeDF.randomSplit([0.7,0.3], seed = 2020)

# print the count of observations in each set
print("Observations in training set = ", trainDF.count())
print("Observations in testing set = ", testDF.count())

Observations in training set =  36680
Observations in testing set =  15495


In [40]:
# import Pipeline from pyspark.ml package
from pyspark.ml import Pipeline

# Build the pipeline object by providing stages(transformers + Estimator) 
# that you need the dataframe to pass through
# Transfoermers - binarizer, bucketizer, indexers, encoder, assembler
# Estimator - lr
lrpipeline = Pipeline(stages=[binarizer, bucketizer, indexers, encoder, assembler, lr])

# fit the pipeline for the trainind data
lrpipelinemodel = lrpipeline.fit(trainDF)

# transform the data
lrpipelinepredicted = lrpipelinemodel.transform(testDF)

# view some of the columns generated
lrpipelinepredicted.select('label', 'rawPrediction', 'probability', 'prediction').show(10)

+-----+--------------------+--------------------+----------+
|label|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+----------+
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
+-----+--------------------+--------------------+----------+
only showing top 10 rows



In [41]:
# Build the MulticlassClassificationEvaluator object 'evaluator'
multievaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# 1. Accuracy
print("Accuracy: ", multievaluator.evaluate(lrpipelinepredicted, {evaluator.metricName: "accuracy"})) 
# 2. Area under the ROC curve
print('Area under the ROC curve = ', evaluator.evaluate(lrpipelinepredicted))
# 3. Precision (Positive Predictive Value)
print("Precision = ", multievaluator.evaluate(lrpipelinepredicted, {evaluator.metricName: "weightedPrecision"}))
# 4. Recall (True Positive Rate)
print("Recall = ", multievaluator.evaluate(lrpipelinepredicted, {evaluator.metricName: "weightedRecall"}))
# 5. F1 Score (F-measure)
print("F1 Score = ", multievaluator.evaluate(lrpipelinepredicted, {evaluator.metricName: "f1"}))

Accuracy:  0.9721110185586135
Area under the ROC curve =  0.5
Precision =  0.9721110185586135
Recall =  0.9721110185586135
F1 Score =  0.9721110185586135


### Model Persistence
Model persistence means saving your model to a disk. After you finalize your model for prediction depending upon the performance, you need to save the model to the disk. Let's say, you finalize 'lrpipelinemodel' to be used for in production environment i.e. in your application. We use the following code to save it.

##### Saving pipeline model

In [42]:
# use save() method to save the model
# write().overwrite() is usually used when you want to replace the older model with a new one
# It might happen that you wish to retrain your model and save it at the same the place
lrpipelinemodel.write().overwrite().save("lrmodel")

                                                                                

##### Loading pipeline model

In [43]:
# import PipelineModel from pyspark.ml package
from pyspark.ml import PipelineModel

# load the model from the location it is stored
# The loaded model acts as PipelineModel
pipemodel = PipelineModel.load("lrmodel")

# use the PipelineModel object to perform prediciton on test data. 
# Use .transform() to perfrom prediction
prediction = pipemodel.transform(testDF)

# print the results
prediction.select('label', 'rawPrediction', 'probability', 'prediction').show(5)

                                                                                

+-----+--------------------+--------------------+----------+
|label|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+----------+
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
|  0.0|[4.02615674317643...|[0.98247001090512...|       0.0|
+-----+--------------------+--------------------+----------+
only showing top 5 rows

