In [1]:
#SparkContext
from pyspark import SparkContext
sc = SparkContext('local','firstapp')
print(sc)
print(sc.version)

<SparkContext master=local appName=firstapp>
2.4.4


In [2]:
from pyspark.sql import SparkSession
my_spark = SparkSession.builder.getOrCreate()
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x7f3ece6d1d68>


In [3]:
my_spark.catalog.listTables()

[]

In [4]:

# Don't change this file path
file_path = "clean_diabetes.csv"

# Read in the diabetes data
diabetes = my_spark.read.csv(file_path,header=True)

# Show the data
print(diabetes.show())

+-------+----+---+-------+
|Glucose| BMI|Age|Outcome|
+-------+----+---+-------+
|    148|33.6| 50|      1|
|     85|26.6| 31|      0|
|    183|23.3| 32|      1|
|     89|28.1| 21|      0|
|    137|43.1| 33|      1|
|    116|25.6| 30|      0|
|     78|31.0| 26|      1|
|    115|35.3| 29|      0|
|    197|30.5| 53|      1|
|    110|37.6| 30|      0|
|    168|38.0| 34|      1|
|    139|27.1| 57|      0|
|    189|30.1| 59|      1|
|    166|25.8| 51|      1|
|    100|30.0| 32|      1|
|    118|45.8| 31|      1|
|    107|29.6| 31|      1|
|    103|43.3| 33|      0|
|    115|34.6| 32|      1|
|    126|39.3| 27|      0|
+-------+----+---+-------+
only showing top 20 rows

None


In [5]:
diabetes.createOrReplaceTempView('Diabetes')

In [6]:
my_spark.catalog.listTables()

[Table(name='diabetes', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [7]:
diabetes.show()

+-------+----+---+-------+
|Glucose| BMI|Age|Outcome|
+-------+----+---+-------+
|    148|33.6| 50|      1|
|     85|26.6| 31|      0|
|    183|23.3| 32|      1|
|     89|28.1| 21|      0|
|    137|43.1| 33|      1|
|    116|25.6| 30|      0|
|     78|31.0| 26|      1|
|    115|35.3| 29|      0|
|    197|30.5| 53|      1|
|    110|37.6| 30|      0|
|    168|38.0| 34|      1|
|    139|27.1| 57|      0|
|    189|30.1| 59|      1|
|    166|25.8| 51|      1|
|    100|30.0| 32|      1|
|    118|45.8| 31|      1|
|    107|29.6| 31|      1|
|    103|43.3| 33|      0|
|    115|34.6| 32|      1|
|    126|39.3| 27|      0|
+-------+----+---+-------+
only showing top 20 rows



In [8]:
diabetes.describe()

DataFrame[summary: string, Glucose: string, BMI: string, Age: string, Outcome: string]

In [33]:
diabetes = diabetes.withColumn('Glucose',diabetes.Glucose.cast('integer'))
diabetes = diabetes.withColumn('BMI',diabetes.BMI.cast('double'))
diabetes = diabetes.withColumn('Age',diabetes.Age.cast('integer'))
diabetes = diabetes.withColumn('Outcome',diabetes.Outcome.cast('integer'))
diabetes = diabetes.withColumn('label',diabetes.Outcome)

In [34]:
diabetes.describe()

DataFrame[summary: string, Glucose: string, BMI: string, Age: string, Outcome: string, label: string]

In [35]:
from pyspark.ml.feature import VectorAssembler
vec_assembler = VectorAssembler(inputCols=['Glucose','BMI','Age'], outputCol='features')

In [36]:
#stages list all the steps you want your data to go through
from pyspark.ml import Pipeline
diabetes_pipe  = Pipeline(stages = [vec_assembler])

In [37]:
piped_data = diabetes_pipe.fit(diabetes).transform(diabetes)

In [38]:
train,test = piped_data.randomSplit([.6,.4])

In [39]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()

In [40]:
import pyspark.ml.evaluation as evals
evaluator = evals.BinaryClassificationEvaluator(metricName='areaUnderROC')

In [41]:
import pyspark.ml.tuning as tune
import numpy as np
grid = tune.ParamGridBuilder()
grid = grid.addGrid(lr.regParam,np.arange(0,.1,.01))
grid = grid.addGrid(lr.elasticNetParam,[0,1])
grid.build();

In [42]:
cv  = tune.CrossValidator(estimator = lr,  estimatorParamMaps=grid,evaluator=evaluator)

In [43]:
best_lr = lr.fit(train)
print(best_lr)

LogisticRegressionModel: uid = LogisticRegression_5632e5065654, numClasses = 2, numFeatures = 3


In [44]:
# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

0.7979277085422332
