In [9]:
from pyspark.sql import SparkSession, types
class font:
   BOLD = '\033[1m'
   END = '\033[0m'

spark = SparkSession.builder.master("local").getOrCreate()
df_json = spark.read.json('/Users/willdowdy/Desktop/Data_Options/stroke_data.json')

print(font.BOLD + 'Top Row of DataFrame' + font.END)
df_json.show(1)
print(font.BOLD + 'Record Count Pre-drop: ' + font.END, df_json.count())
df_json = df_json.na.drop()
print(font.BOLD + 'Record Count Post-drop: ' + font.END, df_json.count())

import pyspark.sql.functions as F 
categ = df_json.select('work_type').distinct().rdd.flatMap(lambda x:x).collect()
exprs = [F.when(F.col('work_type') == cat,1).otherwise(0)\
            .alias(str(cat)) for cat in categ]
df_json = df_json.select(exprs+df_json.columns)

print(font.BOLD + '\nTransforming Categorical Variables/\nTesting Original Column Against New Columns to Validate Consistency:' + font.END)
df_json.select(['work_type','Private','Self-employed','Govt_job','children','Never_worked']).show(5)

categ = df_json.select('smoking_status').distinct().rdd.flatMap(lambda x:x).collect()
exprs = [F.when(F.col('smoking_status') == cat,1).otherwise(0)\
            .alias(str(cat)) for cat in categ]
df_json = df_json.select(exprs+df_json.columns)

df_json.select(['smoking_status','smokes','Unknown','never smoked','formerly smoked']).show(5)

df_json = df_json.withColumn('Urban_Else_Rural', F.when(F.col("Residence_type") == 'Rural', 0).otherwise(1))
df_json = df_json.withColumn('Male_Else_Female', F.when(F.col("gender") == 'Female', 0).otherwise(1))
df_json = df_json.withColumn('Ever_Married_2', F.when(F.col("ever_married") == 'No', 0).otherwise(1))

df_json.select('Urban_Else_Rural', 'Residence_type','Male_Else_Female','gender','Ever_Married_2','ever_married').show(5)



[1mTop Row of DataFrame[0m
+--------------+----+-----------------+----+------------+------+-------------+------------+----+---------------+------+---------+
|Residence_type| age|avg_glucose_level| bmi|ever_married|gender|heart_disease|hypertension|  id| smoking_status|stroke|work_type|
+--------------+----+-----------------+----+------------+------+-------------+------------+----+---------------+------+---------+
|         Urban|67.0|           228.69|36.6|         Yes|  Male|            1|           0|9046|formerly smoked|     1|  Private|
+--------------+----+-----------------+----+------------+------+-------------+------------+----+---------------+------+---------+
only showing top 1 row

[1mRecord Count Pre-drop: [0m 5110
[1mRecord Count Post-drop: [0m 4909
[1m
Transforming Categorical Variables/
Testing Original Column Against New Columns to Validate Consistency:[0m
+-------------+-------+-------------+--------+--------+------------+
|    work_type|Private|Self-employed|Go

In [10]:
df_json = df_json.drop('work_type','smoking_status','id','Residence_type','gender','ever_married')
df_json = df_json.select([F.col(c).cast("double") for c in df_json.columns])

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

scale_df = df_json.select('age','avg_glucose_level','bmi')

unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

for i in df_json.columns:
    if i in ['age','bmi','avg_glucose_level']:
        assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
        scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
        pipeline = Pipeline(stages=[assembler, scaler])
        df_json = pipeline.fit(df_json).transform(df_json).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")
        
print(font.BOLD + "Normalizing Features of Varied Scales/Magnitudes - Setting Scale Between 0 and 1" + font.END) 
df_json.select('age','age_Scaled','bmi','bmi_Scaled','avg_glucose_level','avg_glucose_level_Scaled').show(5)

df_json = df_json.drop('age','avg_glucose_level','bmi')

[1mNormalizing Features of Varied Scales/Magnitudes - Setting Scale Between 0 and 1[0m
+----+----------+----+----------+-----------------+------------------------+
| age|age_Scaled| bmi|bmi_Scaled|avg_glucose_level|avg_glucose_level_Scaled|
+----+----------+----+----------+-----------------+------------------------+
|67.0|     0.817|36.6|     0.301|           228.69|                   0.801|
|80.0|     0.976|32.5|     0.254|           105.92|                   0.235|
|49.0|     0.597|34.4|     0.276|           171.23|                   0.536|
|79.0|     0.963|24.0|     0.157|           174.12|                   0.549|
|81.0|     0.988|29.0|     0.214|           186.21|                   0.605|
+----+----------+----+----------+-----------------+------------------------+
only showing top 5 rows



In [16]:
from pyspark.ml.linalg import Vectors

test_df = df_json.select('formerly smoked','age_Scaled','Urban_Else_Rural','Male_Else_Female','stroke')
assemblerInputs = ['formerly smoked','age_Scaled','Urban_Else_Rural','Male_Else_Female']
vector_assembler = VectorAssembler(inputCols = assemblerInputs, outputCol = 'features')
assembler_temp = vector_assembler.transform(test_df)

assembler = assembler_temp.drop('formerly smoked','age_Scaled','Urban_Else_Rural','Male_Else_Female')

from pyspark.ml.feature import StringIndexer

label_indexer = StringIndexer(inputCol = "stroke", outputCol = "label")
ml_dataset = label_indexer.fit(assembler).transform(assembler)
temp = ml_dataset.select('stroke','label','features')

zeros = temp.filter("label==0.0")
ones = temp.filter("label==1.0")
from pyspark.sql.functions import rand
zeros = zeros.orderBy(rand()).limit(487)
unionDF = zeros.union(ones).orderBy(rand())

training, test = unionDF.randomSplit([0.8,0.2])

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

rf = RandomForestClassifier(labelCol = "label", featuresCol= "features")
bc_evaluator = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction")
rfparamGrid = (ParamGridBuilder().addGrid(rf.maxDepth, [2, 5, 10]).addGrid(rf.maxBins, [5, 10, 20]).addGrid(rf.numTrees, [5, 20, 50]).build())

rfcv = CrossValidator(estimator = rf, estimatorParamMaps = rfparamGrid, evaluator = bc_evaluator, numFolds = 10)

rfcvModel = rfcv.fit(training)
rfpredictions = rfcvModel.transform(test)
print(font.BOLD + "Running Cross-Validated Random Forest Model" + font.END)
print('AUC Score:', bc_evaluator.evaluate(rfpredictions))


[1mRunning Cross-Validated Random Forest Model[0m
AUC Score: 0.7835349092908191
