<a href="http://www.calstatela.edu/centers/hipic"><img align="left" src="https://avatars2.githubusercontent.com/u/4156894?v=3&s=100"><image/>
</a>
<img align="right" alt="California State University, Los Angeles" src="http://www.calstatela.edu/sites/default/files/groups/California%20State%20University%2C%20Los%20Angeles/master_logo_full_color_horizontal_centered.svg" style="width: 360px;"/>

#    CIS5560 Term Project Tutorial

------
#### Authors: [Monika Mishra](https://www.linkedin.com/in/monika-mishra-8b2a4115/), [Amogh Mahesh](https://www.linkedin.com/in/amoghmahesh/), [Aakanksha Tasgaonkar](https://www.linkedin.com/in/aakanksha-tasgaonkar-272ba393/)

#### Instructor: [Jongwook Woo](https://www.linkedin.com/in/jongwook-woo-7081a85)

#### Date: 05/12/2019

## Import Spark SQL and Spark ML Libraries
Import all the Spark SQL and ML libraries as mentioned below. This is neccessary to access the functions available in those libraries.

In [4]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.classification import LogisticRegression

##Read csv file from DBFS  (Databricks File System)
1. After <filename>.csv file is added to the data in the left frame, create a table using the UI, "Upload File"
2. Click "Preview Table to view the table" and select the option as <filename>.csv has a header as the first row. "First line is header"
3. Change the data type of the columns
4. Click on the create table button.

In [6]:
# DataFrame Schema, that should be a Table schema
amazon1 = StructType([
  StructField("marketplace", StringType(), False),
  StructField("customer_id", IntegerType(), False),
  StructField("review_id", IntegerType(), False),
  StructField("product_id", StringType(), False),
  StructField("product_parent", IntegerType(), False),
  StructField("product_title", StringType(), False),
  StructField("product_category", StringType(), False),
  StructField("star_rating", DoubleType(), False),
  StructField("helpful_votes", IntegerType(), False),
  StructField("total_votes", IntegerType(), False),
  StructField("vine", StringType(), False),
  StructField("verified_purchase", StringType(), False),
  StructField("review_headline", StringType(), False),
  StructField("review_body", StringType(), False),
  StructField("review_date", StringType(), False)
])

##Create a dataframe from the table, using Spark SQL

In [8]:
tsv = spark.sql("SELECT * FROM ratings_csv")

tsv.show(5)

## Clear the data

In the next step, we will select the columns that are useful for our model.

In [10]:
tsv1 = tsv.select("product_id", "product_parent", "product_title", "product_category", "helpful_votes", "total_votes", "vine", "verified_purchase", "review_headline", "review_body", "review_date","star_rating" )

tsv1.show(5)

## Clean the data

In [12]:
df1 = tsv1.filter(tsv1.product_category.isNotNull())
df2 = df1.filter(df1.helpful_votes.isNotNull())
df3 = df2.filter(df2.total_votes.isNotNull())
df4 = df3.filter(df3.vine.isNotNull())
df5 = df4.filter(df4.verified_purchase.isNotNull())
df6 = df5.filter(df5.review_headline.isNotNull())
df7 = df6.filter(df6.review_body.isNotNull())
df8 = df7.filter(df7.review_date.isNotNull())
df9 = df8.filter(df8.star_rating.isNotNull())

df9.show(5)

##Select features and label
1. Select the relevant columns in a new dataframe and define the label.
2. Convert the string type columns into indexes using StringIndexer
3. Split the data in 70-30 train-test ratio.
4. Store the train-test data for both of the models separately.
5. Using Vector Assembler define the set of columns to be used as features.

In [14]:

data = df9.select("product_id", "product_parent", "product_title", "product_category", "helpful_votes", "total_votes", "vine", "verified_purchase",  "review_date",col("star_rating").alias("label"))
#data = df9.select("product_id", "product_parent", "product_title", "product_category", "helpful_votes", "total_votes", "vine", "verified_purchase", "review_headline", "review_body", "review_date",col("star_rating").alias("label"))
data.show(5)

In [15]:
data = StringIndexer(inputCol='product_id', outputCol='product_id'+"_index").fit(data).transform(data)
data = StringIndexer(inputCol='product_title', outputCol='product_title'+"_index").fit(data).transform(data)
data = StringIndexer(inputCol='product_category', outputCol='product_category'+"_index").fit(data).transform(data)
data = StringIndexer(inputCol='vine', outputCol='vine'+"_index").fit(data).transform(data)
data = StringIndexer(inputCol='verified_purchase', outputCol='verified_purchase'+"_index").fit(data).transform(data)
#data = StringIndexer(inputCol='review_headline', outputCol='review_headline'+"_index").fit(data).transform(data)
#data = StringIndexer(inputCol='review_body', outputCol='review_body'+"_index").fit(data).transform(data)
data = StringIndexer(inputCol='review_date', outputCol='review_date'+"_index").fit(data).transform(data)

data.show(5)

## Split the data
In the next step we split the data in a train and test set. We have split the data in the ratio of **70 to 30**.

In [17]:
# Split the data
splits = data.randomSplit([0.7, 0.3])

# for decision tree regression
dt_train = splits[0]
dt_test = splits[1].withColumnRenamed("label", "trueLabel")


# for gradient boosted tree regression
gbt_train = splits[0]
gbt_test = splits[1].withColumnRenamed("label", "trueLabel")

print ("Training Rows:", gbt_train.count(), " Testing Rows:", gbt_test.count())

In [18]:
assembler = VectorAssembler(inputCols = ["product_parent", "helpful_votes", "total_votes", "product_id_index", "product_title_index", "product_category_index", "vine_index", "verified_purchase_index","review_date_index"], outputCol="features")
#assembler = VectorAssembler(inputCols = ["product_parent", "helpful_votes", "total_votes", "product_id_index", "product_title_index", "product_category_index", "vine_index", "verified_purchase_index", "review_headline_index","review_body_index","review_date_index"], outputCol="features")

## Algorithms used to Train the model

In this project we have used the following algorithm to train our model:

1. Decision Tree Regressor

## Decision Tree Regression Model

In [21]:
dt = DecisionTreeRegressor(featuresCol='features', labelCol='label', maxBins=77582)
dt_pipeline = Pipeline(stages=[assembler, dt])

In [22]:
paramGrid = ParamGridBuilder().build()
cv = CrossValidator(estimator=dt_pipeline, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid, numFolds=5)

In [23]:
dt_model = cv.fit(dt_train)

In [24]:
dt_prediction = dt_model.transform(dt_test)
dt_predicted = dt_prediction.select("features", "prediction", "trueLabel")
dt_predicted.show(10)

## Test the model
We transform the test dataframe to generate label predictions.

### Retrieve the Root Mean Square Error (RMSE)
There are a number of metrics used to measure the variance between predicted and actual values. Of these, the root mean square error (RMSE) is a commonly used value that is measured in the same units as the prediced and actual values - so in this case, the RMSE indicates the average number of minutes between predicted and actual flight delay values. You can use the **RegressionEvaluator** class to retrieve the RMSE.

In [27]:
dt_evaluator = RegressionEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
dt_rmse = dt_evaluator.evaluate(dt_prediction)
print ("Root Mean Square Error (RMSE):", dt_rmse)

#### Root Mean Square Error (RMSE) for Decision Tree Regression Model: 1.1911385148262073