# Giới thiệu
Sử dụng bộ dữ liệu **[Black Friday](https://www.kaggle.com/sdolezel/black-friday)** để dự đoán số tiền mà người dùng sẵn sàng bỏ ra mua hàng hóa dựa theo những observations như giới tính, độ tuổi, nghề nghiệp

Xử lý dữ liệu, tiến hành evaluate model dựa trên các thuật toán Regression (Decision Tree Regression, Linear Regression)

Languages/Technologies: Apache Spark, PySpark, Pandas, Python

# Đọc dữ liệu

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder.appName("AnalyzeData_Black_Friday")\
        .config("spark.some.config.option", "some-value")\
        .getOrCreate()

In [2]:
df = spark.read.csv("datasets/black_friday/train.csv", header=True, inferSchema=True)

df.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



Sau khi quan sát, ta thấy dữ liệu gồm 12 attributes

Ta có thể quan sát trực quan hóa dữ liệu hơn sử dụng pandas Dataframe

In [3]:
df_pd = df.toPandas() 
df_pd.head(5)

Unnamed: 0,User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3,Purchase
0,1000001,P00069042,F,0-17,10,A,2,0,3,,,8370
1,1000001,P00248942,F,0-17,10,A,2,0,1,6.0,14.0,15200
2,1000001,P00087842,F,0-17,10,A,2,0,12,,,1422
3,1000001,P00085442,F,0-17,10,A,2,0,12,14.0,,1057
4,1000002,P00285442,M,55+,16,C,4+,0,8,,,7969


Xem thống kê dữ liệu một cách chi tiết sử dụng pandas info

Ta có thể thấy 2 cột Product_Category_2 và Product_Category_3 so với các cột dữ liệu khác thì đang có những dữ liệu bị missing hay còn gọi là **NaN**

In [4]:
df_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 550068 entries, 0 to 550067
Data columns (total 12 columns):
 #   Column                      Non-Null Count   Dtype  
---  ------                      --------------   -----  
 0   User_ID                     550068 non-null  int32  
 1   Product_ID                  550068 non-null  object 
 2   Gender                      550068 non-null  object 
 3   Age                         550068 non-null  object 
 4   Occupation                  550068 non-null  int32  
 5   City_Category               550068 non-null  object 
 6   Stay_In_Current_City_Years  550068 non-null  object 
 7   Marital_Status              550068 non-null  int32  
 8   Product_Category_1          550068 non-null  int32  
 9   Product_Category_2          376430 non-null  float64
 10  Product_Category_3          166821 non-null  float64
 11  Purchase                    550068 non-null  int32  
dtypes: float64(2), int32(5), object(5)
memory usage: 39.9+ MB


# Thực hiện các bước tiền xử lý dữ liệu

## Xử lý dữ liệu NaN

Có 2 cách để deal với dữ liệu NaN như sau:
+ Drop mọi dòng mang giá trị NaN (có thể drop khi tất cả các observation đều là NaN hoặc drop khi một trong các observation là NaN)
+ Fill các giá trị NaN đó bằng giá trị trung bình của observation đó

Ở đây mình chọn các fill các giá trị bằng mean

In [6]:
from pyspark.sql.types import FloatType
df = df.withColumn("Product_Category_2", df["Product_Category_2"].cast(FloatType()))
df = df.withColumn("Product_Category_3", df["Product_Category_3"].cast(FloatType()))

In [7]:
df_pd = df.toPandas()
df_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 550068 entries, 0 to 550067
Data columns (total 12 columns):
 #   Column                      Non-Null Count   Dtype  
---  ------                      --------------   -----  
 0   User_ID                     550068 non-null  int32  
 1   Product_ID                  550068 non-null  object 
 2   Gender                      550068 non-null  object 
 3   Age                         550068 non-null  object 
 4   Occupation                  550068 non-null  int32  
 5   City_Category               550068 non-null  object 
 6   Stay_In_Current_City_Years  550068 non-null  object 
 7   Marital_Status              550068 non-null  int32  
 8   Product_Category_1          550068 non-null  int32  
 9   Product_Category_2          376430 non-null  float32
 10  Product_Category_3          166821 non-null  float32
 11  Purchase                    550068 non-null  int32  
dtypes: float32(2), int32(5), object(5)
memory usage: 35.7+ MB


In [8]:
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy="median",
                  inputCols=["Product_Category_2", "Product_Category_3"], 
                  outputCols=["FilledNA_Product_Category_2", "FilledNA_Product_Category_3"])
df = imputer.fit(df).transform(df)

In [9]:
df_pd = df.toPandas()
df_pd.head(5)

Unnamed: 0,User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3,Purchase,FilledNA_Product_Category_2,FilledNA_Product_Category_3
0,1000001,P00069042,F,0-17,10,A,2,0,3,,,8370,9.0,14.0
1,1000001,P00248942,F,0-17,10,A,2,0,1,6.0,14.0,15200,6.0,14.0
2,1000001,P00087842,F,0-17,10,A,2,0,12,,,1422,9.0,14.0
3,1000001,P00085442,F,0-17,10,A,2,0,12,14.0,,1057,14.0,14.0
4,1000002,P00285442,M,55+,16,C,4+,0,8,,,7969,9.0,14.0


In [10]:
df_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 550068 entries, 0 to 550067
Data columns (total 14 columns):
 #   Column                       Non-Null Count   Dtype  
---  ------                       --------------   -----  
 0   User_ID                      550068 non-null  int32  
 1   Product_ID                   550068 non-null  object 
 2   Gender                       550068 non-null  object 
 3   Age                          550068 non-null  object 
 4   Occupation                   550068 non-null  int32  
 5   City_Category                550068 non-null  object 
 6   Stay_In_Current_City_Years   550068 non-null  object 
 7   Marital_Status               550068 non-null  int32  
 8   Product_Category_1           550068 non-null  int32  
 9   Product_Category_2           376430 non-null  float32
 10  Product_Category_3           166821 non-null  float32
 11  Purchase                     550068 non-null  int32  
 12  FilledNA_Product_Category_2  550068 non-null  float32
 13 

**Nhận xét**: Ta có thể thấy 2 cột Product_Category_2 và Product_Category_3 đã fill các giá NaN bằng mean và cho ra 2 cột mới với tổng số sample bằng các cột còn lại (550068)

## Xây dựng pipeline biến đổi dữ liệu thích hợp cho model

In [11]:
df.dtypes

[('User_ID', 'int'),
 ('Product_ID', 'string'),
 ('Gender', 'string'),
 ('Age', 'string'),
 ('Occupation', 'int'),
 ('City_Category', 'string'),
 ('Stay_In_Current_City_Years', 'string'),
 ('Marital_Status', 'int'),
 ('Product_Category_1', 'int'),
 ('Product_Category_2', 'float'),
 ('Product_Category_3', 'float'),
 ('Purchase', 'int'),
 ('FilledNA_Product_Category_2', 'float'),
 ('FilledNA_Product_Category_3', 'float')]

In [12]:
from pyspark.sql.types import IntegerType

# first we need to covert two columns to integer type
df = df.withColumn("FilledNA_Product_Category_2", df["FilledNA_Product_Category_2"].cast(IntegerType()))
df = df.withColumn("FilledNA_Product_Category_3", df["FilledNA_Product_Category_3"].cast(IntegerType()))

# describe all columns have integer type
string_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(string_features).toPandas().describe()

Unnamed: 0,User_ID,Occupation,Marital_Status,Product_Category_1,Purchase,FilledNA_Product_Category_2,FilledNA_Product_Category_3
count,550068.0,550068.0,550068.0,550068.0,550068.0,550068.0,550068.0
mean,1003029.0,8.076707,0.409653,5.40427,9263.968713,9.576434,13.596114
std,1727.592,6.52266,0.49177,3.936211,5023.065394,4.226025,2.352863
min,1000001.0,0.0,0.0,1.0,12.0,2.0,3.0
25%,1001516.0,2.0,0.0,1.0,5823.0,8.0,14.0
50%,1003077.0,7.0,0.0,5.0,8047.0,9.0,14.0
75%,1004478.0,14.0,1.0,8.0,12054.0,14.0,14.0
max,1006040.0,20.0,1.0,20.0,23961.0,18.0,18.0


In [13]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

categorical_cols = ['Product_ID', 'Gender', 'Age', 'City_Category', 'Stay_In_Current_City_Years']
numerical_cols = ['Occupation', 'Marital_Status', 'Product_Category_1', 
                  'FilledNA_Product_Category_2', 'FilledNA_Product_Category_3']

features_pre_processing = []
for categoricalCol in categorical_cols:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + '_Indexed')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], 
                                     outputCols=[categoricalCol + "_classVec"])
    features_pre_processing += [stringIndexer, encoder]
assembler = VectorAssembler(
    inputCols=[cate + '_classVec' for cate in categorical_cols] + numerical_cols,
    outputCol="features"
)
features_pre_processing.append(assembler)

pipeline_prep_feature = Pipeline(stages = features_pre_processing)
df = pipeline_prep_feature.fit(df).transform(df)
df.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: float (nullable = true)
 |-- Product_Category_3: float (nullable = true)
 |-- Purchase: integer (nullable = true)
 |-- FilledNA_Product_Category_2: integer (nullable = true)
 |-- FilledNA_Product_Category_3: integer (nullable = true)
 |-- Product_ID_Indexed: double (nullable = false)
 |-- Product_ID_classVec: vector (nullable = true)
 |-- Gender_Indexed: double (nullable = false)
 |-- Gender_classVec: vector (nullable = true)
 |-- Age_Indexed: double (nullable = false)
 |-- Age_classVec: vector (nullable = true)
 |-- City_Category_Indexed: double (nullable = false)

In [14]:
# pre-processing purchase label
label_pre_processing = StringIndexer(inputCol ="Purchase", outputCol = 'Purchase_Label')

In [15]:
df = label_pre_processing.fit(df).transform(df)

In [16]:
df = df.select(['features', 'Purchase_Label'])
df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Purchase_Label: double (nullable = false)



# Training
Sử dụng Decision Tree Regression

In [18]:
# split dataset
train_data, test_data = df.randomSplit([0.7, 0.3], seed = 1712)
print(f"Training Dataset Count: {train_data.count()}")
print(f"Test Dataset Count: {test_data.count()}")

Training Dataset Count: 385169
Test Dataset Count: 164899


In [19]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(featuresCol = 'features',labelCol= 'Purchase_Label', maxDepth=5)

model = dt.fit(train_data)

# Testing

In [20]:
predictions = model.transform(test_data)

In [27]:
pred_pd = predictions.select("prediction", "Purchase_Label", "features").toPandas()
pred_pd.head(5)

Unnamed: 0,prediction,Purchase_Label,features
0,1165.067474,1750.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,1165.067474,1156.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,1165.067474,549.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,1165.067474,1619.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,1165.067474,1016.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [30]:
pred_pd['prediction'].value_counts()

3648.876031     41654
1693.449638     34048
1341.165307     33872
1165.067474     11308
5873.796660      8390
7199.053400      7160
8472.075606      6155
7831.912838      6067
8643.937055      3983
11117.175804     2341
5954.063788      2227
4437.754187      1570
12809.684698     1401
5351.138273      1361
9721.573770      1159
7039.253474       751
1120.901357       498
4303.245053       279
4549.741117       167
4011.314917        86
11664.834225       67
3464.495495        63
5077.907692        60
13773.272727       46
14767.116667       46
10774.814815       33
3347.961039        32
8170.202899        29
4663.464789        24
13779.589041       17
15595.428571        3
9282.571429         2
Name: prediction, dtype: int64

# Evaluate

In [29]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="Purchase_Label", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R-squared (R2)  = %g " % r2)

R-squared (R2)  = 0.615319 


# Training
Sử dụng Linear Regression

In [32]:
from pyspark.ml.regression import LinearRegression

dt = LinearRegression(featuresCol = 'features',labelCol= 'Purchase_Label', maxIter=10, regParam=0.3, elasticNetParam=0.8)

model = dt.fit(train_data)

# Testing

In [33]:
predictions = model.transform(test_data)

In [34]:
pred_pd = predictions.select("prediction", "Purchase_Label", "features").toPandas()
pred_pd.head(5)

Unnamed: 0,prediction,Purchase_Label,features
0,1251.281856,1750.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,1245.776134,1156.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,1239.892325,549.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,1240.258296,1619.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,1253.535436,1016.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


# Evaluate

In [35]:
evaluator = RegressionEvaluator(labelCol="Purchase_Label", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R-squared (R2)  = %g " % r2)

R-squared (R2)  = 0.627529 
