## Mahmoud Ali Group 4

![image.png](attachment:image.png)

##### **Good luck with taking your exam. Keep working and make your dreams all come true. Seeing the results of all of your hard work will make this struggle worth it. We’re all thinking of you.** 
<b><font color='blue'>AI-PRO Spark Team ITI</font></b>

## Objective:
- The objective from this project is to create a <b>Logistic Regression Classifier</b> to predict the <b>Stroke Condition</b>.
- <b>Stoke</b> is a condition in which either the blood flow to the brain stops or blood flow is excessive.
- It is required to obtain <b>ROC > 0.65</b>.

### Data:
- Data is provided in csv format in a file named <b>healthcare-dataset-stroke-data.csv</b>

### Column names and data types are as follow:
- id, integer.
- gender, string.
- age, double.
- hypertension, integer.
- heart_disease, integer.
- ever_married, string.
- work_type, string.
- Residence_type, string.
- avg_glucose_level, double.
- bmi, double.
- smoking_status, string.
- stroke, integer <b>(Target Label)</b>.
If the person has stroke the stroke label value is <b>"1"</b> otherwise <b>"0"</b>.

## To perform this task follow the following guiding steps:

### Create a spark session and import the required libraries

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("final Project").getOrCreate()
import pandas as pd 
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, StringType, StructType, StructField

### Create a data schema programatically

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 65.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=115cdbd3caebffcbcc783720b6764c2ea08f1ed8a78714383b2a613b40bdb025
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


### Read the data using the standard DataReader (Key,Value) pairs format
- Provide the schema and any other required options.

In [5]:
df = spark.read.csv("/content/healthcare-dataset-stroke-data.csv", header="true", inferSchema="true")

### Explore the data 
#### You have to do the following:
- Print the Schema.
- Show the first 10 rows from the data.
- Explore null values and show how many null values in each column.
- Plot a count plot for the target label and <b>notice the graph</b>.
- Perform any additional EDA you find useful.

In [6]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [7]:
df.head(10)

[Row(id=9046, gender='Male', age=67.0, hypertension=0, heart_disease=1, ever_married='Yes', work_type='Private', Residence_type='Urban', avg_glucose_level=228.69, bmi='36.6', smoking_status='formerly smoked', stroke=1),
 Row(id=51676, gender='Female', age=61.0, hypertension=0, heart_disease=0, ever_married='Yes', work_type='Self-employed', Residence_type='Rural', avg_glucose_level=202.21, bmi='N/A', smoking_status='never smoked', stroke=1),
 Row(id=31112, gender='Male', age=80.0, hypertension=0, heart_disease=1, ever_married='Yes', work_type='Private', Residence_type='Rural', avg_glucose_level=105.92, bmi='32.5', smoking_status='never smoked', stroke=1),
 Row(id=60182, gender='Female', age=49.0, hypertension=0, heart_disease=0, ever_married='Yes', work_type='Private', Residence_type='Urban', avg_glucose_level=171.23, bmi='34.4', smoking_status='smokes', stroke=1),
 Row(id=1665, gender='Female', age=79.0, hypertension=1, heart_disease=0, ever_married='Yes', work_type='Self-employed', Re

In [8]:
df = df.replace(('N/A'), None)

In [9]:
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+
| id|gender|age|hypertension|heart_disease|ever_married|work_type|Residence_type|avg_glucose_level|bmi|smoking_status|stroke|
+---+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+
|  0|     0|  0|           0|            0|           0|        0|             0|                0|201|             0|     0|
+---+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+



In [10]:
df.show(4)

+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|   id|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
| 9046|  Male|67.0|           0|            1|         Yes|      Private|         Urban|           228.69|36.6|formerly smoked|     1|
|51676|Female|61.0|           0|            0|         Yes|Self-employed|         Rural|           202.21|null|   never smoked|     1|
|31112|  Male|80.0|           0|            1|         Yes|      Private|         Rural|           105.92|32.5|   never smoked|     1|
|60182|Female|49.0|           0|            0|         Yes|      Private|         Urban|           171.23|34.4|         smokes|     1|
+-----+------+----+------------+-------------+---------

### Get the summary statistics of the age column
- You will find the minimum age is about <b>0.08</b>.
- Remove rows for the age below <b>2 years old</b>.

In [11]:
df.describe('age').show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|              5110|
|   mean|43.226614481409015|
| stddev| 22.61264672311348|
|    min|              0.08|
|    max|              82.0|
+-------+------------------+



In [12]:
df_filtered=df.filter(df['age']>2)

In [13]:
df_filtered.describe().show()

+-------+------------------+------+-----------------+-------------------+-------------------+------------+---------+--------------+------------------+------------------+--------------+--------------------+
|summary|                id|gender|              age|       hypertension|      heart_disease|ever_married|work_type|Residence_type| avg_glucose_level|               bmi|smoking_status|              stroke|
+-------+------------------+------+-----------------+-------------------+-------------------+------------+---------+--------------+------------------+------------------+--------------+--------------------+
|  count|              4935|  4935|             4935|               4935|               4935|        4935|     4935|          4935|              4935|              4740|          4935|                4935|
|   mean| 36433.59675785208|  null|44.70982776089159|0.10091185410334347|0.05572441742654509|        null|     null|          null|106.52655927051633|29.257489451476793|       

### Working with gender & smoking_status columns:
- Select and show the gender & smoking_status columns
- Get the distinct values for each column.

In [14]:
df_filtered.select("gender","smoking_status").show()

+------+---------------+
|gender| smoking_status|
+------+---------------+
|  Male|formerly smoked|
|Female|   never smoked|
|  Male|   never smoked|
|Female|         smokes|
|Female|   never smoked|
|  Male|formerly smoked|
|  Male|   never smoked|
|Female|   never smoked|
|Female|        Unknown|
|Female|        Unknown|
|Female|   never smoked|
|Female|         smokes|
|Female|         smokes|
|  Male|        Unknown|
|Female|   never smoked|
|Female|   never smoked|
|  Male|         smokes|
|  Male|         smokes|
|Female|   never smoked|
|  Male|        Unknown|
+------+---------------+
only showing top 20 rows



In [15]:
df_filtered.select("gender","smoking_status").distinct().show()

+------+---------------+
|gender| smoking_status|
+------+---------------+
|  Male|         smokes|
|  Male|formerly smoked|
| Other|formerly smoked|
|Female|        Unknown|
|  Male|        Unknown|
|Female|formerly smoked|
|  Male|   never smoked|
|Female|         smokes|
|Female|   never smoked|
+------+---------------+



In [16]:
df_filtered.select("gender").distinct().show()

+------+
|gender|
+------+
|Female|
| Other|
|  Male|
+------+



In [17]:
df_filtered.select("smoking_status").distinct().show()

+---------------+
| smoking_status|
+---------------+
|         smokes|
|        Unknown|
|   never smoked|
|formerly smoked|
+---------------+



#### Change the string values of the columns into numerical values as follow:
1. Gender column:
    * Male = 1 
    * Female = 0 
    * Other = 0 
2. Smoking Status column:
  * never smoked = 0
  * Unknown = 0.5
  * formerly smoked = 0.75
  * smokes = 1.0


In [18]:
dict_map={
    
    'Male' : 1,
    'Female' : 0,
    'Other' : 0,
    "never smoked" : 0,
    "Unknown" : 0.5,
    'formerly smoked' : 0.75,
    'smokes' : 1.0
    
    
}

In [19]:
def impute_title(title):
    return dict_map[title]

In [20]:
# dict_map_func = F.udf(lambda x: impute_title(x), DoubleType())
# df_filtered = df_filtered.withColumn('Gender', dict_map_func('Gender'))

#### Show the output DataFrame
- Select and show the gender & smoking_status columns after value changing.
- Print schema for the new dataframe.

In [21]:
df_filtered = df_filtered.replace('Male', '1')
df_filtered = df_filtered.replace('Female', '0')
df_filtered = df_filtered.replace('Other', '0')
df_filtered = df_filtered.replace("Unknown", '0.5')
df_filtered = df_filtered.replace('formerly smoked', '0.75')
df_filtered = df_filtered.replace('smokes', '1.0')
df_filtered = df_filtered.replace('never smoked', '0')

In [22]:
df_filtered.createOrReplaceTempView('view')
spark.sql("SELECT Gender,smoking_status from view").show()

+------+--------------+
|Gender|smoking_status|
+------+--------------+
|     1|          0.75|
|     0|             0|
|     1|             0|
|     0|           1.0|
|     0|             0|
|     1|          0.75|
|     1|             0|
|     0|             0|
|     0|           0.5|
|     0|           0.5|
|     0|             0|
|     0|           1.0|
|     0|           1.0|
|     1|           0.5|
|     0|             0|
|     0|             0|
|     1|           1.0|
|     1|           1.0|
|     0|             0|
|     1|           0.5|
+------+--------------+
only showing top 20 rows



In [23]:
df_filtered.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [24]:
df_filtered = df_filtered.withColumn("Gender", df_filtered["Gender"].cast(DoubleType()))
df_filtered = df_filtered.withColumn("smoking_status", df_filtered["smoking_status"].cast(DoubleType()))
df_filtered = df_filtered.withColumn("Bmi", df_filtered["Bmi"].cast(DoubleType()))

In [25]:
df_filtered.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Gender: double (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- Bmi: double (nullable = true)
 |-- smoking_status: double (nullable = true)
 |-- stroke: integer (nullable = true)



### Deal with null value according to your data analysis.

In [26]:
Mean = df_filtered.agg({'Bmi':'mean'}).collect()[0][0]
df_filtered=df_filtered.fillna({'Bmi':Mean})

In [27]:
df_filtered.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+
| id|gender|age|hypertension|heart_disease|ever_married|work_type|Residence_type|avg_glucose_level|bmi|smoking_status|stroke|
+---+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+
|  0|     0|  0|           0|            0|           0|        0|             0|                0|  0|             0|     0|
+---+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+



### Split the data into training and test dataframes:
- 80% training and 20% test.
- seed = 42.
- Save each dataframe as a parquet file

In [28]:
train,test=df_filtered.randomSplit([0.8, 0.2],seed=42)

### Read the saved Train and Test DataFrame:
- Use the dataframes you read in the subsequent steps.

In [29]:
train.show(5)

+---+------+----+------------+-------------+------------+---------+--------------+-----------------+------------------+--------------+------+
| id|Gender| age|hypertension|heart_disease|ever_married|work_type|Residence_type|avg_glucose_level|               Bmi|smoking_status|stroke|
+---+------+----+------------+-------------+------------+---------+--------------+-----------------+------------------+--------------+------+
| 67|   0.0|17.0|           0|            0|          No|  Private|         Urban|            92.97|29.257489451476793|          0.75|     0|
| 77|   0.0|13.0|           0|            0|          No| children|         Rural|            85.81|              18.6|           0.5|     0|
| 91|   0.0|42.0|           0|            0|          No|  Private|         Urban|            98.53|              18.5|           0.0|     0|
| 99|   0.0|31.0|           0|            0|          No|  Private|         Urban|           108.89|              52.3|           0.5|     0|
|121| 

In [30]:
test.show(5)

+---+------+----+------------+-------------+------------+---------+--------------+-----------------+----+--------------+------+
| id|Gender| age|hypertension|heart_disease|ever_married|work_type|Residence_type|avg_glucose_level| Bmi|smoking_status|stroke|
+---+------+----+------------+-------------+------------+---------+--------------+-----------------+----+--------------+------+
| 84|   1.0|55.0|           0|            0|         Yes|  Private|         Urban|            89.17|31.5|           0.0|     0|
|129|   0.0|24.0|           0|            0|          No|  Private|         Urban|            97.55|26.2|           0.0|     0|
|156|   0.0|33.0|           0|            0|         Yes|  Private|         Rural|            86.97|42.2|           0.0|     0|
|205|   0.0|43.0|           0|            0|         Yes|  Private|         Rural|            88.23|37.6|           0.5|     0|
|259|   1.0|79.0|           0|            0|         Yes|  Private|         Urban|           198.79|24.9

### Create the model:
- Perform feature engineering steps.
- Create the logistic regression classifier.
- Build the pipeline model that uses all feature engineering steps and the model.
- Train the pipeline model using the trainig dataset.

In [31]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
categoricalCols = [field for (field, dataType) in df_filtered.dtypes
                   if dataType == "string"]
categoricalCols

['ever_married', 'work_type', 'Residence_type']

In [32]:
indexOutputCols = [x + "_Index" for x in categoricalCols]
indexOutputCols

['ever_married_Index', 'work_type_Index', 'Residence_type_Index']

In [33]:
oheOutputCols = [x + "_OHE" for x in categoricalCols]
oheOutputCols

['ever_married_OHE', 'work_type_OHE', 'Residence_type_OHE']

In [34]:
stringIndexer = StringIndexer(inputCols=categoricalCols,
                             outputCols=indexOutputCols,
                             handleInvalid='skip')
oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
                          outputCols=oheOutputCols)

In [35]:
numericCols = [field for (field,dataType) in df_filtered.dtypes
              if ((dataType=='double')& (field!='stroke'))]
numericCols

['Gender', 'age', 'avg_glucose_level', 'Bmi', 'smoking_status']

In [36]:
assemblerInputs = oheOutputCols + numericCols
assemblerInputs

['ever_married_OHE',
 'work_type_OHE',
 'Residence_type_OHE',
 'Gender',
 'age',
 'avg_glucose_level',
 'Bmi',
 'smoking_status']

In [37]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [38]:
assembler = VectorAssembler(
  inputCols=assemblerInputs,
    outputCol="features")

In [39]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol='stroke',featuresCol='features')

In [40]:
stages = [stringIndexer, oheEncoder,assembler, lr]

In [41]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(train)

In [42]:
predDF = pipelineModel.transform(test)

In [43]:
predDF.show(5)

+---+------+----+------------+-------------+------------+---------+--------------+-----------------+----+--------------+------+------------------+---------------+--------------------+----------------+-------------+------------------+--------------------+--------------------+--------------------+----------+
| id|Gender| age|hypertension|heart_disease|ever_married|work_type|Residence_type|avg_glucose_level| Bmi|smoking_status|stroke|ever_married_Index|work_type_Index|Residence_type_Index|ever_married_OHE|work_type_OHE|Residence_type_OHE|            features|       rawPrediction|         probability|prediction|
+---+------+----+------------+-------------+------------+---------+--------------+-----------------+----+--------------+------+------------------+---------------+--------------------+----------------+-------------+------------------+--------------------+--------------------+--------------------+----------+
| 84|   1.0|55.0|           0|            0|         Yes|  Private|         

### Perform predictions on tests dataframe:
- Test the model using the test dataframe
- Select and show the feature column.
- Print the schema of the output dataframe.
- Select and show both prediction and label columns.
- Explore the results for the label column stroke=1. i.e. select both columns (prediction,stroke) for stroke=1.<b>notice the result.</b> 
- Count the predicted 1 and 0 values.<b>notice the result.</b>   

### Evaluate the model performance
- Use <b>BinaryClassificationEvaluator</b>. This will calculate the <b>ROC</b>.
- Set the parameters as follow:
    - <b>rawPredictionCol='prediction'</b> and <b>labelCol='stroke'</b>

In [44]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [45]:
evaluatorROC = BinaryClassificationEvaluator(rawPredictionCol='prediction'
                                             ,labelCol='stroke')

In [46]:
evaluatorROC.evaluate(predDF)

0.4994475138121547

### Improve Model Performance
- You may noticed that <b>the target label is imbalanced</b>.
- LogisticRegression classifier has a special parameter <b>weightCol</b> to deal with imbalanced class.
- In order to use this parameter you must have a <b>weightCol</b> in your training dataframe.
- In order to create this column you will need to define a <b>UDF</b> and apply it to the target label column.
- Create a LogisticRegression classifier with <b>weightCol</b> parameter.
- Build and train a pipeline model with the new LogisticRegression.
- Perform the prediction on the test dataframe. 
- Select and show both prediction and label columns.
- Explore the results for the label column stroke=1. i.e. select both columns (prediction,stroke) for stroke=1.<b>notice the result.</b> 
- Count the predicted 1 and 0 values.<b>notice the result.</b>   
- Evaluate the model performance exactly as in the previous step.
### YOU SHOULD OBTAIN  ROC > 0.65
<BR>
<b><font color='red'>Note: feel free to deal with the imbalanced cluster with the above method or another. However, you can only use methods applied on SPARK DATAFRAME</font></b>

In [47]:
wight_1=1-df_filtered.groupBy("stroke").count().collect()[0][1]/df_filtered.count()
wight_0=1-(df_filtered.groupBy("stroke").count().collect()[1][1]/df_filtered.count())
wight_0

0.05025329280648427

In [48]:
from pyspark.sql.functions import udf
stroke_weight_map = {0: wight_0, 1: wight_1}

@udf(returnType=DoubleType())
def impute_weight(stroke):
  return stroke_weight_map[stroke]

In [54]:
train=train.withColumn("weight",impute_weight(train["stroke"]))

In [55]:
train.show()

+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+------------------+--------------+------+-------------------+
| id|Gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level|               Bmi|smoking_status|stroke|             weight|
+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+------------------+--------------+------+-------------------+
| 67|   0.0|17.0|           0|            0|          No|      Private|         Urban|            92.97|29.257489451476793|          0.75|     0|0.05025329280648427|
| 77|   0.0|13.0|           0|            0|          No|     children|         Rural|            85.81|              18.6|           0.5|     0|0.05025329280648427|
| 91|   0.0|42.0|           0|            0|          No|      Private|         Urban|            98.53|              18.5|           0.0|     0|0.05025329280648427|
| 99

In [56]:
lr = LogisticRegression(labelCol='stroke',featuresCol='features',weightCol="weight")

In [57]:
stages = [stringIndexer, oheEncoder,assembler, lr]

In [58]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(train)

In [59]:
predDF = pipelineModel.transform(test)

i got an error and i think its becouse of installation problem in windows , update i used colab to continue the exam

In [60]:
evaluatorROC.evaluate(predDF)

0.7553336166595834

# GOOD LUCK
<b><font color='GREEN'>AI-PRO Spark Team ITI</font></b>

![image-3.png](attachment:image-3.png)