In [8]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

In [9]:
import os
spark_version = 'spark-3.4.1'
os.environ['SPARK_VERSION']=spark_version
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark
# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"
# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:4 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done


In [10]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Stroke_proccessing").getOrCreate()

In [11]:

spark.sparkContext.addFile("full_data.csv")
df_raw = spark.read.csv("full_data.csv", sep=",", header=True, ignoreLeadingWhiteSpace=True)

# Show DataFrame
df_raw.show()

+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|  Male|67.0|           0|            1|         Yes|      Private|         Urban|           228.69|36.6|formerly smoked|     1|
|  Male|80.0|           0|            1|         Yes|      Private|         Rural|           105.92|32.5|   never smoked|     1|
|Female|49.0|           0|            0|         Yes|      Private|         Urban|           171.23|34.4|         smokes|     1|
|Female|79.0|           1|            0|         Yes|Self-employed|         Rural|           174.12|24.0|   never smoked|     1|
|  Male|81.0|           0|            0|         Yes|      Private|         Urban|           186.

In [12]:
df_raw.schema

StructType([StructField('gender', StringType(), True), StructField('age', StringType(), True), StructField('hypertension', StringType(), True), StructField('heart_disease', StringType(), True), StructField('ever_married', StringType(), True), StructField('work_type', StringType(), True), StructField('Residence_type', StringType(), True), StructField('avg_glucose_level', StringType(), True), StructField('bmi', StringType(), True), StructField('smoking_status', StringType(), True), StructField('stroke', StringType(), True)])

In [13]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCols=['gender','ever_married', 'work_type', 'Residence_type', 'smoking_status'], outputCols=['gender_num','ever_married_num', 'work_type_num', 'Residence_type_num', 'smoking_status_num'])
indexer_fitted = indexer.fit(df_raw)
df_indexed = indexer_fitted.transform(df_raw)
df_indexed = df_indexed.drop('gender','ever_married', 'work_type', 'Residence_type', 'smoking_status')
df_indexed.show()

+----+------------+-------------+-----------------+----+------+----------+----------------+-------------+------------------+------------------+
| age|hypertension|heart_disease|avg_glucose_level| bmi|stroke|gender_num|ever_married_num|work_type_num|Residence_type_num|smoking_status_num|
+----+------------+-------------+-----------------+----+------+----------+----------------+-------------+------------------+------------------+
|67.0|           0|            1|           228.69|36.6|     1|       1.0|             0.0|          0.0|               0.0|               2.0|
|80.0|           0|            1|           105.92|32.5|     1|       1.0|             0.0|          0.0|               1.0|               0.0|
|49.0|           0|            0|           171.23|34.4|     1|       0.0|             0.0|          0.0|               0.0|               3.0|
|79.0|           1|            0|           174.12|24.0|     1|       0.0|             0.0|          1.0|               1.0|            

In [14]:
#transform indexed strings to vectors
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols=['gender_num','ever_married_num', 'work_type_num', 'Residence_type_num', 'smoking_status_num'], outputCols=['gender_vec','ever_married_vec', 'work_type_vec', 'Residence_type_vec', 'smoking_status_vec'], dropLast=False)
df_onehot_vec = encoder.fit(df_indexed).transform(df_indexed)
df_onehot_vec = df_onehot_vec.drop('gender_num','ever_married_num', 'work_type_num', 'Residence_type_num', 'smoking_status_num')
df_onehot_vec.show()

+----+------------+-------------+-----------------+----+------+-------------+----------------+-------------+------------------+------------------+
| age|hypertension|heart_disease|avg_glucose_level| bmi|stroke|   gender_vec|ever_married_vec|work_type_vec|Residence_type_vec|smoking_status_vec|
+----+------------+-------------+-----------------+----+------+-------------+----------------+-------------+------------------+------------------+
|67.0|           0|            1|           228.69|36.6|     1|(2,[1],[1.0])|   (2,[0],[1.0])|(4,[0],[1.0])|     (2,[0],[1.0])|     (4,[2],[1.0])|
|80.0|           0|            1|           105.92|32.5|     1|(2,[1],[1.0])|   (2,[0],[1.0])|(4,[0],[1.0])|     (2,[1],[1.0])|     (4,[0],[1.0])|
|49.0|           0|            0|           171.23|34.4|     1|(2,[0],[1.0])|   (2,[0],[1.0])|(4,[0],[1.0])|     (2,[0],[1.0])|     (4,[3],[1.0])|
|79.0|           1|            0|           174.12|24.0|     1|(2,[0],[1.0])|   (2,[0],[1.0])|(4,[1],[1.0])|     (2,[1

In [15]:
#transform vectors into one hot encoding arrays
from pyspark.ml.functions import vector_to_array

col_list = ['gender_vec', 'ever_married_vec', 'work_type_vec', 'Residence_type_vec', 'smoking_status_vec']

for col in col_list:
    df_onehot_vec = df_onehot_vec.withColumn(col + '_onehot', vector_to_array(col))
    df_onehot_vec = df_onehot_vec.drop(col)


In [16]:
df_onehot_vec.show()

+----+------------+-------------+-----------------+----+------+-----------------+-----------------------+--------------------+-------------------------+-------------------------+
| age|hypertension|heart_disease|avg_glucose_level| bmi|stroke|gender_vec_onehot|ever_married_vec_onehot|work_type_vec_onehot|Residence_type_vec_onehot|smoking_status_vec_onehot|
+----+------------+-------------+-----------------+----+------+-----------------+-----------------------+--------------------+-------------------------+-------------------------+
|67.0|           0|            1|           228.69|36.6|     1|       [0.0, 1.0]|             [1.0, 0.0]|[1.0, 0.0, 0.0, 0.0]|               [1.0, 0.0]|     [0.0, 0.0, 1.0, 0.0]|
|80.0|           0|            1|           105.92|32.5|     1|       [0.0, 1.0]|             [1.0, 0.0]|[1.0, 0.0, 0.0, 0.0]|               [0.0, 1.0]|     [1.0, 0.0, 0.0, 0.0]|
|49.0|           0|            0|           171.23|34.4|     1|       [1.0, 0.0]|             [1.0, 0.0]|

In [17]:
from pyspark.sql.functions import col
columns_to_cast = ['age', 'avg_glucose_level', 'bmi']
for col_name in columns_to_cast:
    df_onehot_vec = df_onehot_vec.withColumn(col_name, col(col_name).cast("Double"))
df_onehot_vec.schema

StructType([StructField('age', DoubleType(), True), StructField('hypertension', StringType(), True), StructField('heart_disease', StringType(), True), StructField('avg_glucose_level', DoubleType(), True), StructField('bmi', DoubleType(), True), StructField('stroke', StringType(), True), StructField('gender_vec_onehot', ArrayType(DoubleType(), False), False), StructField('ever_married_vec_onehot', ArrayType(DoubleType(), False), False), StructField('work_type_vec_onehot', ArrayType(DoubleType(), False), False), StructField('Residence_type_vec_onehot', ArrayType(DoubleType(), False), False), StructField('smoking_status_vec_onehot', ArrayType(DoubleType(), False), False)])

In [18]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
#prep vector assembler
assembler = VectorAssembler(inputCols=["age", "avg_glucose_level", "bmi"], outputCol="num_features")
df_assembled = assembler.transform(df_onehot_vec)
scaler = StandardScaler(inputCol="num_features", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(df_assembled)
scaled_data = scaler_model.transform(df_assembled)


columns_to_drop = ["age", "avg_glucose_level", "bmi", "num_features"]
scaled_data = scaled_data.drop(*columns_to_drop)
scaled_data.show()


+------------+-------------+------+-----------------+-----------------------+--------------------+-------------------------+-------------------------+--------------------+
|hypertension|heart_disease|stroke|gender_vec_onehot|ever_married_vec_onehot|work_type_vec_onehot|Residence_type_vec_onehot|smoking_status_vec_onehot|     scaled_features|
+------------+-------------+------+-----------------+-----------------------+--------------------+-------------------------+-------------------------+--------------------+
|           0|            1|     1|       [0.0, 1.0]|             [1.0, 0.0]|[1.0, 0.0, 0.0, 0.0]|               [1.0, 0.0]|     [0.0, 0.0, 1.0, 0.0]|[1.04047987357880...|
|           0|            1|     1|       [0.0, 1.0]|             [1.0, 0.0]|[1.0, 0.0, 0.0, 0.0]|               [0.0, 1.0]|     [1.0, 0.0, 0.0, 0.0]|[1.61410827655648...|
|           0|            0|     1|       [1.0, 0.0]|             [1.0, 0.0]|[1.0, 0.0, 0.0, 0.0]|               [1.0, 0.0]|     [0.0, 0.0, 

In [22]:
scaled_data.show(5, truncate=False)

+------------+-------------+------+-----------------+-----------------------+--------------------+-------------------------+-------------------------+-----------------------------------------------------------+
|hypertension|heart_disease|stroke|gender_vec_onehot|ever_married_vec_onehot|work_type_vec_onehot|Residence_type_vec_onehot|smoking_status_vec_onehot|scaled_features                                            |
+------------+-------------+------+-----------------+-----------------------+--------------------+-------------------------+-------------------------+-----------------------------------------------------------+
|0           |1            |1     |[0.0, 1.0]       |[1.0, 0.0]             |[1.0, 0.0, 0.0, 0.0]|[1.0, 0.0]               |[0.0, 0.0, 1.0, 0.0]     |[1.040479873578807,2.7231375098735775,1.193118377221051]   |
|0           |1            |1     |[0.0, 1.0]       |[1.0, 0.0]             |[1.0, 0.0, 0.0, 0.0]|[0.0, 1.0]               |[1.0, 0.0, 0.0, 0.0]     |[1.614