## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

### To Import Spark SQL and Spark ML Libraries 
To Import Spark SQL and Spark ML Libraries. It is neccessary to access the functions.

In [2]:
%pyspark
from pyspark.sql.types import *

from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

from pyspark.sql.functions import *
from pyspark.ml.feature import *
from pyspark.ml import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.mllib.evaluation import *

### To run the code in PySpark CLI

Set the following to True:

PYSPARK_CLI = True

In [4]:
%pyspark
IS_DB = True # Run the code in Databricks

PYSPARK_CLI = False
if PYSPARK_CLI:
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)

In [5]:
%pyspark
eCommerceSchema = StructType([
  StructField("event_time", TimestampType(), False),
  StructField("event_type", StringType(), False),
  StructField("product_id", IntegerType(), False),
  StructField("category_id", LongType(), False),
  StructField("StringType", StringType(), False),
  StructField("brand", StringType(), False),
  StructField("price", DoubleType(), False),
  StructField("user_id", IntegerType(), False),
  StructField("user_session", StringType(), False),
])

### Load Source Data
The data for this exercise is provided as a CSV file containing details of e-commerce items and catetegories. The data includes specific characteristics (or *features*) for each item, as well as a *label* column indicating what is the Event Type of each item.

You will load this data into a DataFrame and display it.


### Read the csv file from HDFS (Hadoop File System)

In [8]:
%pyspark
# File location and type
file_location = "/user/apandey9/5560/2019-Oct_Master.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location) 

df = df.withColumnRenamed('# File format is event_time', 'event_time')
df = df.filter((col("brand") != "No value") & (col("category_code") != "No value"))

df.show()

### Converting the string type columns into integer using withColumn

In [10]:
%pyspark
df = df.withColumn("event_type", when(df.event_type == 'view', 1) \
                  .when(df.event_type == 'cart', 2) \
                  .when(df.event_type == 'purchase', 3))
df.show()

### Converting the string type columns into indices using StringIndexer

In [12]:
%pyspark
#Converting String Values into Index Values
indexer = StringIndexer(inputCol="brand", outputCol="brandIndex")
indexer1 = StringIndexer(inputCol="category_code", outputCol="category_codeIndex")
df = indexer.fit(df).transform(df) 
df = indexer1.fit(df).transform(df)
df.show()

In [13]:
%pyspark
df.printSchema()

### Create a temporary view of the dataframe "df"

In [15]:
%pyspark
# Create a view or table
temp_table_name = "2019-Oct_Master.csv"
df.createOrReplaceTempView(temp_table_name)

In [16]:
%pyspark
if PYSPARK_CLI:
    csv = spark.read.csv('2019-Oct_Master.csv', inferSchema=True, header=True)
else:
    csv = spark.sql("SELECT * FROM 2019-Oct_Master.csv")

### Selecting features
In this following step, we are selecting the features that are useful for Event_Type Prediction.

In [18]:
%pyspark
data = csv.select("product_id", "brandIndex", "category_codeIndex", "price", "user_id", col("event_type").alias("label"))

### Split the Data
It is common practice when building supervised machine learning models to split the source data, using some of it to train the model and reserving some to test the trained model. In this exercise, you will use 70% of the data for training, and reserve 30% for testing.

In [20]:
%pyspark
# Split the data
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
print ("Training Rows:", train.count(), " Testing Rows:", test.count())


## DECISION TREE CLASSIFIER


### Prepare the Training Data
To train the regression model, you need a training data set that includes a vector of numeric features, and a label column. In this exercise, you will use the **VectorAssembler** class to transform the feature columns into a vector.

In [23]:
%pyspark
assembler_dt = VectorAssembler(inputCols=["product_id", "brandIndex", "category_codeIndex", "price", "user_id" ], outputCol="features")

#lr = LinearRegression(labelCol="label", featuresCol="normFeatures")
dt = DecisionTreeClassifier(labelCol="label", featuresCol= "features", maxBins=3000)


### Parameter Building and Train using Cross Validator
Using the CrossValidator to evaluate each combination of parameters which are defined in ParameterGrid against multiple folds, in order to find the best performing parameters. It is used to find the best model for the data. Here the number of folds is assigned to 3.

In [25]:
%pyspark
# TODO: params refered to the reference above
paramGrid_cv_dt = ParamGridBuilder() \
  .addGrid(dt.maxDepth, [3, 5]) \
  .addGrid(dt.minInfoGain, [0.0]) \
  .build()

### Building the Pipeline
Define a pipeline that creates a feature vector and trains a Decision Tree model

In [27]:
%pyspark
pipeline_cv_dt = Pipeline(stages=[assembler_dt, dt])

K = 3
cv_dt = CrossValidator(estimator=pipeline_cv_dt, evaluator=MulticlassClassificationEvaluator(), estimatorParamMaps=paramGrid_cv_dt, numFolds = K)

model_cv_dt = cv_dt.fit(train)


### Test the Model
Now you're ready to use the **transform** method of the model to generate some predictions. You can use this approach to predict Event Type where the label is unknown; but in this case you are using the test data which includes a known true label value, so you can compare the predicted event type to the actual event type.

In [29]:
%pyspark
prediction_cv_dt = model_cv_dt.transform(test)
prediction_cv_dt.select("features", "prediction", "trueLabel")


### Obtaning Accuracy, Test Error, Precision and Recall
Using the evaluation metrics as Accuracy, Test error, Precision and Recall. The Decision tree classifier model performance is calculated.

In [31]:
%pyspark
evaluator_cv_dt = MulticlassClassificationEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="accuracy")
accuracy_cv_dt = evaluator_cv_dt.evaluate(prediction_cv_dt)
print ("Average Accuracy =", accuracy_cv_dt)
print ("Test Error = ", (1 - accuracy_cv_dt))

evaluator_cv_dt = MulticlassClassificationEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="weightedPrecision")
precision_cv_dt = evaluator_cv_dt.evaluate(prediction_cv_dt)
print ("Precision =", precision_cv_dt)

evaluator_cv_dt = MulticlassClassificationEvaluator(labelCol="trueLabel", predictionCol="prediction", metricName="weightedRecall")
Recall_cv_dt = evaluator_cv_dt.evaluate(prediction_cv_dt)
print ("Recall =", Recall_cv_dt)

###References

https://towardsdatascience.com/multi-class-text-classification-with-pyspark-7d78d022ed35

https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier

