#CIS 5560 Term Project
Checking the python version

In [2]:
import sys
print('Python: {}'.format(sys.version))

### Import Spark SQL and Spark ML Libraries

First, import the libraries you will need:

In [4]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

import numpy as np
from pyspark.sql import functions as F
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import GBTRegressor

### Load Source Data

1. After youth_risk_behaviours_surveillance.csv file is added to the data of the left frame, create a table using the UI, especially, "Upload File"
2. Click "Preview Table to view the table" and check the option of youth_risk_behaviours_surveillance.csv has a header as the first row: "First line is header"
3. Change the data type of the table columns if needed

The following should show your table name you created with inferSchmea option marked when you create it

In [7]:
#%fs ls /FileStore/tables/youth_risk_behaviours_surveillance.csv

### Prepare the Data
Select a subset of columns to use as *features* and label to predict. Here the prediction is done over 'Greater Risk data value'.

In [9]:
csv1 = spark.sql("SELECT * FROM youth_risk_behaviours_surveillance_csv")
csv1.createOrReplaceTempView("table1")
csv1.show(5)


In [10]:
#show topics that can be used for prediction

df1 = spark.sql("SELECT count(*), Topic from table1 group by Topic")
df1.show(20,False)


# Data Cleaning

1. Filter the rows of topic that needs to be predicted
2. Drop the rows where value of label is null or ''.
3. Clip the values of columns of int, double type to where value is below 1 or above 99 percentile.

In [12]:
df2 = spark.sql("SELECT * from table1 where Topic = 'Obesity, Overweight, and Weight Control'")
df3 = df2.filter(df2.Greater_Risk_Data_Value.isNotNull())
df3.show(5, False)


### Select features and label

1. Select the relevant columns in a new dataframe and define the label.
2. Convert the string type columns into indexes using StringIndexer
3. Split the data in 70-30 train-test ratio.
4. Store the train-test data for both of the models separately.
5. Using Vector Assembler define the set of columns to be used as features.

In [14]:
data = df3.select("YEAR", "LocationDesc", "Sex", "Race", "Grade", "SubTopicID", "QuestionCode", "LocationId", "StratID1", col("Greater_Risk_Data_Value").alias("label"))
data.show(5, False)

data = StringIndexer(inputCol='LocationDesc', outputCol='LocationDesc'+"_index").fit(data).transform(data)
data = StringIndexer(inputCol='Sex', outputCol='Sex'+"_index").fit(data).transform(data)
data = StringIndexer(inputCol='Race', outputCol='Race'+"_index").fit(data).transform(data)
data = StringIndexer(inputCol='Grade', outputCol='Grade'+"_index").fit(data).transform(data)
data = StringIndexer(inputCol='SubTopicID', outputCol='SubTopicID'+"_index").fit(data).transform(data)
data = StringIndexer(inputCol='QuestionCode', outputCol='QuestionCode'+"_index").fit(data).transform(data)
data = StringIndexer(inputCol='LocationId', outputCol='LocationId'+"_index").fit(data).transform(data)
data = StringIndexer(inputCol='StratID1', outputCol='StratID1'+"_index").fit(data).transform(data)

# show indexed columns
data.show(5, False)

# Split the data
splits = data.randomSplit([0.7, 0.3])


# for linear regression
lr_train = splits[0]
lr_test = splits[1].withColumnRenamed("label", "trueLabel")


# for decision tree regression
dt_train = splits[0]
dt_test = splits[1].withColumnRenamed("label", "trueLabel")

In [15]:
assembler = VectorAssembler(inputCols = ["YEAR", "LocationDesc_index", "Sex_index", "Race_index", "Grade_index", "SubTopicID_index", "QuestionCode_index", "LocationId_index", "StratID1_index"], outputCol="features")

##Linear Regression Model

In [17]:
lr = LinearRegression(featuresCol = 'features', labelCol='label', maxIter=12345, regParam=0.4, elasticNetParam=0.7)
lr_pipeline = Pipeline(stages=[assembler, lr])

In [18]:
lr_model = lr_pipeline.fit(lr_train)
lr_prediction = lr_model.transform(lr_test)

Root Mean square error and Accuracy for linear regression

In [20]:
lr_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = lr_evaluator.evaluate(lr_prediction)
print("RMSE for linear regression is: %.4f" % rmse)

lr_accuracy = lr_evaluator.evaluate(lr_prediction, {lr_evaluator.metricName: "r2"})
print("linear regression model accuracy is: %.3f" %(lr_accuracy*100))


Compare predicted values under 'prediction' column with actual values under 'trueLabel'

In [22]:
lr_predicted = lr_prediction.select("features", "prediction", "trueLabel")
lr_predicted.show()

##Decision Tree Regression Model

In [24]:
dt = DecisionTreeRegressor(featuresCol='features', labelCol='label', maxBins=91)
dt_pipeline = Pipeline(stages=[assembler, dt])

Cross Validation

In [26]:
paramGrid = ParamGridBuilder().build()
cv = CrossValidator(estimator=dt_pipeline, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid, numFolds=10)

dt_train.show()
dt_model = cv.fit(dt_train)

Compare predicted values under 'prediction' column with actual values under 'trueLabel'

In [28]:
dt_prediction = dt_model.transform(dt_test)
dt_predicted = dt_prediction.select("features", "prediction", "trueLabel")
dt_predicted.show()

Root Mean square error and Accuracy for decision tree regression

In [30]:
dt_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
dt_rmse = dt_evaluator.evaluate(dt_prediction)
print ("Root Mean Square Error (RMSE):", dt_rmse)


In [31]:
dt_accuracy_evaluator=RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction",metricName="r2")
dt_accuracy=dt_accuracy_evaluator.evaluate(dt_prediction)
print("accuracy  on test data=%g" % (dt_accuracy*100))