# Exploring The Data

<b> <ul>
    <li>Chúng ta sử Dataset này để xây dựng một mô hình dự đoán bằng thuật toán Decision Tree bằng MLLib của Spark
    và được thực hiện bằng Pyspark của Python.
    </li>
    <li>Black fridat là một bộ dữ liệu được thu thập từ một cửa hàng bán lẻ về các giao dịch trong ngày blackfriday.Có thể download dataset từ https://www.kaggle.com/sdolezel/black-friday
    </li>
    <li>
        Chúng tôi sử dụng thuật toán Decision Tree của nhóm thuật toán Regression để thực hiện việc dự đoán biến liên tục Purchase dự trên các biến phân loại còn lại của tập dữ liệu như Product_ID,Occupation,Gender,...
    </li>
    </ul>
    
</b>

## 1 - Load dataset 
       

#### Bạn thay đổi đường dẫn tương ứng với đường dẫn tới thư mục chứa dataset trên máy của bạn

In [38]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Project-Bigdata").getOrCreate()
df = spark.read.csv('../Spark_Files/spark-warehouse/BlackFriday.csv',header=True,inferSchema=True)
df.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



#### Sử dụng phương thức .show( ) để hiển thị 20 rows đầu của dataset (Đối với phương thức này rất khó để bạn quát sát được đối với dataset có nhiều biến.Quan sát kết quả bạn sẽ thấy)

In [39]:
df.show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|                12|              null|              null|    1422

### Quan sát dataset ta thấy :
#### Dataset gồm có 12 biến : 
#### Trong đó các biến input : User_ID, Product_ID, Gender, Age,Occupation, City_Category, Stay_In_Current_City_Years, Marital_Status, Product_Category_1, Product_Category_2, Product_Category_3
#### Biến output là : Purchase

#### Quan sát dữ liệu của 5 dòng đầu tiên trong dataset.Sử dụng Pandas Datafram có thể nhìn rõ hơn là Spark Dataframe.show()

In [40]:
import pandas as pd
pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
User_ID,1000001,1000001,1000001,1000001,1000002
Product_ID,P00069042,P00248942,P00087842,P00085442,P00285442
Gender,F,F,F,F,M
Age,0-17,0-17,0-17,0-17,55+
Occupation,10,10,10,10,16
City_Category,A,A,A,A,C
Stay_In_Current_City_Years,2,2,2,2,4+
Marital_Status,0,0,0,0,0
Product_Category_1,3,1,12,12,8
Product_Category_2,,6,,14,


## 2 - Kiểm tra giá trị thiếu

<b>Chúng tôi xây dựng một hàm để kiểm tra số lượng giá trị thiếu của mỗi cột và trả ra số lượng giá trị thiếu đó</b>

In [41]:
from pyspark.sql.functions import col, count, isnan, lit, sum
def count_null(c, nan_as_null=False):
    """Use conversion between boolean and integer
    - False -> 0
    - True ->  1
    """
    pred = col(c).isNull() & (isnan(c) if nan_as_null else lit(True))
    return sum(pred.cast("integer")).alias(c)


<b>Ta sử dụng hàm agg() để thực hiện việc tính toán của hàm count_null (đếm giá trị thiếu trên dataset theo từng cột) và sử dụng toPandas() để hiển thị kết quả</b>

In [42]:
df.agg(*[count_null(c) for c in df.columns]).toPandas()

Unnamed: 0,User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3,Purchase
0,0,0,0,0,0,0,0,0,0,166986,373299,0


<b>Quan sát ta thấy có 2 cột có giá trị thiếu đó là Product_Category_2 và Product_Category_3 lần lượt là 166986 và 373299</b> 

#### Ta xử lí giá trị thiếu này bằng cách sử dụng phương Imputer của MLlib để điền vào các vị trí null của tập dữ liệu.Imputer sẽ thực hiện tính giá trị trung bình của cột đầu vào và điền vào các vị trí null của cột đó.
#### Đối với phương thức này yêu cầu các đầu vào phải có kiểu giá trị là float nên ta sẽ ép kiểu giá trị 2 cột này sang float.
#### Quan sát code bên dưới

In [43]:
from pyspark.sql.types import FloatType
df = df.withColumn("Product_Category_2", df["Product_Category_2"].cast(FloatType()))
df = df.withColumn("Product_Category_3", df["Product_Category_3"].cast(FloatType()))
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=["Product_Category_2", "Product_Category_3"], outputCols=["Out_Product_Category_2", "Out_Product_Category_3"])
df = imputer.fit(df).transform(df)

#### Như vậy là ta đã có được 2 cột mới được tạo ra và thêm vào dataset ban đầu với tên như đã khởi tạo là Out_Product_Category_2 ,Out_Product_Category_3 ứng với 2 cột ban đầu là Product_Category_2 và Product_Category_3.Tuy nhiên 2 cột mới này đã được xử lý giá trị thiếu.
### Quan sát lại dataset để kiểm tra

In [44]:
pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
User_ID,1000001,1000001,1000001,1000001,1000002
Product_ID,P00069042,P00248942,P00087842,P00085442,P00285442
Gender,F,F,F,F,M
Age,0-17,0-17,0-17,0-17,55+
Occupation,10,10,10,10,16
City_Category,A,A,A,A,C
Stay_In_Current_City_Years,2,2,2,2,4+
Marital_Status,0,0,0,0,0
Product_Category_1,3,1,12,12,8
Product_Category_2,,6,,14,


## 3 - Tóm tắt các số liệu thống kê của các biến kiểu số

In [45]:
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
User_ID,537577,1002991.8470284257,1714.3926953614418,1000001,1006040
Occupation,537577,8.082710011775058,6.524119769935589,0,20
Marital_Status,537577,0.4087972513705013,0.4916121522296798,0,1
Product_Category_1,537577,5.295546498455105,3.7507009498797985,1,18
Purchase,537577,9333.859852635065,4981.022132656485,185,23961


## 4 - Xây dưng pipeline để tiền xử lý dữ liệu

### Tương tự như việc thực hiện phương thức xử lí giá trị thiếu bên trên, ta tiến hành xử lý các dữ liệu còn lại 

### Các bạn xem lại lý thuyết Pipeline , Estimator, Transformer để biết cách hoạt động và thực hiện các đối tượng này.Đoạn code bên dưới chúng tôi sử dụng các phương thức như StringIndexer ,  OneHotEncoderEstimator , VectorAssembler để xử lý các thuộc tính phân loại như 'Product_ID', 'Gender', 'Age', 'Stay_In_Current_City_Years' .Các bạn có thể tìm hiểu các phương thức đó là gì tại Document của Spark.
### Ở đây tôi muốn thực hiện việc biến đổi các giá trị phân loại của các biến phần loại này thành các chỉ số phân loại của nó bằng phương thức StringIndexer.Sau đó sử dụng phương thức OneHotEncoderEstimator để mã hóa các giá trị vừa biến đối trên thành vector nhị phân.Và cuối cùng là chúng tôi gọp các thuộc tính phân loại đã biến đổi trên với các thuộc tính kiểu số lại thành 1 vector là lấy tên là features bằng phương thức VectorAssembler.

In [46]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ['Product_ID', 'Gender', 'Age', 'Stay_In_Current_City_Years']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
numericCols = ['Occupation', 'Marital_Status', 'Product_Category_1', 'Out_Product_Category_2', 'Out_Product_Category_3']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

### Các bạn có thể thấy tôi sử dụng biến stages để đại diện có các giai đoạn chúng tôi xử lý dữ liệu đã miêu tả bên trên.Cuối cùng là tôi sử dụng đối tượng Pipeline để thực hiện việc xử lý đó trên tập dữ liệu của chúng ta.

In [47]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
df.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: float (nullable = true)
 |-- Product_Category_3: float (nullable = true)
 |-- Purchase: integer (nullable = true)
 |-- Out_Product_Category_2: float (nullable = true)
 |-- Out_Product_Category_3: float (nullable = true)
 |-- Product_IDIndex: double (nullable = false)
 |-- Product_IDclassVec: vector (nullable = true)
 |-- GenderIndex: double (nullable = false)
 |-- GenderclassVec: vector (nullable = true)
 |-- AgeIndex: double (nullable = false)
 |-- AgeclassVec: vector (nullable = true)
 |-- Stay_In_Current_City_YearsIndex: double (nullable = false)
 |-- Stay_In_Cu

### Các bạn quan sát bây giờ dataset của chúng ta đã có thêm những cột mới được thêm vào.Đó là các cột đã được xử lý dữ liệu và có một cột features ở cuối, đó là cột thuộc tính đã diện cho các cột còn lại đã được biến đổi thành 1 vector.

In [48]:
selectedCols = ['Purchase', 'features']
df = df.select(selectedCols)
pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
Purchase,8370,15200,1422,1057,7969
features,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


## 5 - Chia dataset của chúng ta thành 2 tập dữ liệu là Train và Test để tiến hành xây dựng model.
### Chúng ta sẽ sử dụng tập Train để học model và tập Test để kiểm tra mô hình dự đoán .

In [49]:
train, test = df.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 376137
Test Dataset Count: 161440


In [50]:
train.printSchema()

root
 |-- Purchase: integer (nullable = true)
 |-- features: vector (nullable = true)



## 6 - Train model

In [51]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator


In [52]:
dt = DecisionTreeRegressor(featuresCol = 'features',labelCol= 'Purchase')

In [53]:
model = dt.fit(train)

## 7 - Test model

In [54]:
predictions = model.transform(test)

In [55]:
predictions.select("prediction", "Purchase", "features").show(5)

+-----------------+--------+--------------------+
|       prediction|Purchase|            features|
+-----------------+--------+--------------------+
|723.2159661644198|     188|(3638,[215,3622,3...|
|723.2159661644198|     188|(3638,[551,3622,3...|
|723.2159661644198|     198|(3638,[1694,3622,...|
|723.2159661644198|     209|(3638,[351,3625,3...|
|723.2159661644198|     210|(3638,[1516,3623,...|
+-----------------+--------+--------------------+
only showing top 5 rows



## 8 - Đánh giá model bằng các độ đo 

$$ RMSE $$

In [58]:
evaluator = RegressionEvaluator(
    labelCol="Purchase", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g " % rmse)

Root Mean Squared Error (RMSE) on test data = 3536.23 


$$ R^2$$

In [59]:
evaluator = RegressionEvaluator(
    labelCol="Purchase", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R-squared (R2)  = %g " % r2)

R-squared (R2)  = 0.496167 
