In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')



In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
          .appName("Python Spark SQL basic example") \
          .config("spark.some.config.option", "some-value") \
          .getOrCreate()

In [3]:
import time as t

cuse = spark.read.csv('D:\COURSE PDFs\College Notes\SEMESTER VI\Big Data Analytics\Healthcare Stroke Dataset\data.csv', header=True, inferSchema=True)
cuse.show(20)
start_time=t.time()

+-----+------+----+------------+-------------+------------+---------+--------------+-----------------+--------------+------+
|   id|gender| age|hypertension|heart_disease|ever_married|work_type|Residence_type|avg_glucose_level|smoking_status|stroke|
+-----+------+----+------------+-------------+------------+---------+--------------+-----------------+--------------+------+
| 9046|     1|67.0|           0|            1|           1|        1|             1|           228.69|             1|     1|
|51676|     0|61.0|           0|            0|           1|        2|             2|           202.21|             0|     1|
|31112|     1|80.0|           0|            1|           1|        1|             2|           105.92|             0|     1|
|60182|     0|49.0|           0|            0|           1|        1|             1|           171.23|             2|     1|
| 1665|     0|79.0|           1|            0|           1|        2|             2|           174.12|             0|     1|


In [4]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# categorical columns
categorical_columns = cuse.columns[0:3]

In [5]:
stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c) for c in categorical_columns]
stringindexer_stages += [StringIndexer(inputCol='stroke', outputCol='label')]

In [6]:
onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in categorical_columns]


In [7]:
feature_columns = ['onehot_' + c for c in categorical_columns]
vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features')

In [8]:
all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]
pipeline = Pipeline(stages=all_stages)

In [9]:
pipeline_model = pipeline.fit(cuse)


In [10]:
final_columns = feature_columns + ['features', 'label']
cuse_df = pipeline_model.transform(cuse).\
            select(final_columns)
            
cuse_df.show(10)

+-------------------+-------------+----------------+--------------------+-----+
|          onehot_id|onehot_gender|      onehot_age|            features|label|
+-------------------+-------------+----------------+--------------------+-----+
|(5109,[5053],[1.0])|(2,[1],[1.0])|(103,[63],[1.0])|(5214,[5053,5110,...|  1.0|
|(5109,[3218],[1.0])|(2,[0],[1.0])|(103,[15],[1.0])|(5214,[3218,5109,...|  1.0|
|(5109,[1593],[1.0])|(2,[1],[1.0])|(103,[28],[1.0])|(5214,[1593,5110,...|  1.0|
|(5109,[3907],[1.0])|(2,[0],[1.0])|(103,[12],[1.0])|(5214,[3907,5109,...|  1.0|
| (5109,[530],[1.0])|(2,[0],[1.0])| (103,[7],[1.0])|(5214,[530,5109,5...|  1.0|
|(5109,[3618],[1.0])|(2,[1],[1.0])|(103,[41],[1.0])|(5214,[3618,5110,...|  1.0|
|(5109,[3392],[1.0])|(2,[1],[1.0])|(103,[74],[1.0])|(5214,[3392,5110,...|  1.0|
|  (5109,[29],[1.0])|(2,[0],[1.0])|(103,[55],[1.0])|(5214,[29,5109,51...|  1.0|
|(5109,[1306],[1.0])|(2,[0],[1.0])|(103,[10],[1.0])|(5214,[1306,5109,...|  1.0|
|(5109,[3931],[1.0])|(2,[0],[1.0])| (103

In [11]:
training, test = cuse_df.randomSplit([0.8, 0.2], seed=1234)


In [12]:
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')

In [13]:
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(dt.maxDepth, [2,3,4,5]).\
    build()

In [14]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")

In [15]:
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)

In [16]:
cv_model = cv.fit(cuse_df)


In [17]:
show_columns = ['features', 'label', 'prediction', 'rawPrediction', 'probability']


In [18]:
pred_training_cv = cv_model.transform(training)
pred_training_cv.select(show_columns).show(10, truncate=False)

+-----------------------------------+-----+----------+--------------+----------------------------------------+
|features                           |label|prediction|rawPrediction |probability                             |
+-----------------------------------+-----+----------+--------------+----------------------------------------+
|(5214,[5110,5154],[1.0,1.0])       |0.0  |0.0       |[4808.0,232.0]|[0.953968253968254,0.046031746031746035]|
|(5214,[1,5109,5125],[1.0,1.0,1.0]) |0.0  |0.0       |[4808.0,232.0]|[0.953968253968254,0.046031746031746035]|
|(5214,[2,5109,5125],[1.0,1.0,1.0]) |0.0  |0.0       |[4808.0,232.0]|[0.953968253968254,0.046031746031746035]|
|(5214,[4,5110,5146],[1.0,1.0,1.0]) |0.0  |0.0       |[4808.0,232.0]|[0.953968253968254,0.046031746031746035]|
|(5214,[5,5109,5125],[1.0,1.0,1.0]) |0.0  |0.0       |[4808.0,232.0]|[0.953968253968254,0.046031746031746035]|
|(5214,[6,5109,5132],[1.0,1.0,1.0]) |0.0  |0.0       |[4808.0,232.0]|[0.953968253968254,0.046031746031746035]|
|

In [19]:
pred_test_cv = cv_model.transform(test)
pred_test_cv.select(show_columns).show(10, truncate=False)

+-----------------------------------+-----+----------+--------------+----------------------------------------+
|features                           |label|prediction|rawPrediction |probability                             |
+-----------------------------------+-----+----------+--------------+----------------------------------------+
|(5214,[0,5110,5172],[1.0,1.0,1.0]) |0.0  |0.0       |[4808.0,232.0]|[0.953968253968254,0.046031746031746035]|
|(5214,[3,5110,5118],[1.0,1.0,1.0]) |0.0  |0.0       |[4808.0,232.0]|[0.953968253968254,0.046031746031746035]|
|(5214,[20,5109,5116],[1.0,1.0,1.0])|0.0  |0.0       |[4808.0,232.0]|[0.953968253968254,0.046031746031746035]|
|(5214,[23,5110,5113],[1.0,1.0,1.0])|0.0  |0.0       |[4808.0,232.0]|[0.953968253968254,0.046031746031746035]|
|(5214,[24,5109,5160],[1.0,1.0,1.0])|0.0  |0.0       |[4808.0,232.0]|[0.953968253968254,0.046031746031746035]|
|(5214,[25,5109,5133],[1.0,1.0,1.0])|0.0  |0.0       |[4808.0,232.0]|[0.953968253968254,0.046031746031746035]|
|

In [20]:
end_time=t.time()

In [21]:
end_time-start_time


27.774269342422485