In [1]:
import findspark
findspark.init()
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as f
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler

### **Create Spark session**

In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local")\
          .appName("StructureAPI")\
          .config("spark.some.config.option", "some-value")\
          .getOrCreate()

### **Explore Data Analysis**

In [3]:
data = spark.read.load("sales_train.csv", format="csv", header=True, delimiter=",")
data.show(5)

+----------+--------------+-------+-------+----------+------------+
|      date|date_block_num|shop_id|item_id|item_price|item_cnt_day|
+----------+--------------+-------+-------+----------+------------+
|02.01.2013|             0|     59|  22154|     999.0|         1.0|
|03.01.2013|             0|     25|   2552|     899.0|         1.0|
|05.01.2013|             0|     25|   2552|     899.0|        -1.0|
|06.01.2013|             0|     25|   2554|   1709.05|         1.0|
|15.01.2013|             0|     25|   2555|    1099.0|         1.0|
+----------+--------------+-------+-------+----------+------------+
only showing top 5 rows



In [40]:
data.count()

2935849

Tiếp theo xét xem có dòng nào bị thiếu giá trị hay không.

In [41]:
data.select([f.count(f.when(f.col(c).contains('None') | \
                            f.col(c).contains('NULL') | \
                            (f.col(c) == '' ) | \
                            f.col(c).isNull() | \
                            f.isnan(c), c 
                           )).alias(c)
                    for c in data.columns]).show()

+----+--------------+-------+-------+----------+------------+
|date|date_block_num|shop_id|item_id|item_price|item_cnt_day|
+----+--------------+-------+-------+----------+------------+
|   0|             0|      0|      0|         0|           0|
+----+--------------+-------+-------+----------+------------+



- **Date**: date in format dd/mm/yyyy
- **date_block_num**: a consecutive month number, used for convenience. January 2013 is 0, February 2013 is 1,..., October 2015 is 33
- **shop_id**: unique identifier of a shop
- **item_id**: unique identifier of an item
- **item_price**: price of an item
- **item_cnt_day**: number of products sold

In [34]:
data.printSchema()

root
 |-- date: string (nullable = true)
 |-- date_block_num: string (nullable = true)
 |-- shop_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item_price: string (nullable = true)
 |-- item_cnt_day: string (nullable = true)



In [4]:
data = data.withColumn('date',f.to_date(data.date,'dd.MM.yyyy'))
data = data.withColumn("date_block_num",data.date_block_num.cast('int'))
data = data.withColumn("item_price",data.item_price.cast('double'))
data = data.withColumn("item_cnt_day",data.item_cnt_day.cast('int'))

Ở đây ta thấy các cột numeric đang là dạng string nên chúng ta sẽ cast chúng lại thành dạng dữ liệu số

In [5]:
data.printSchema()

root
 |-- date: date (nullable = true)
 |-- date_block_num: integer (nullable = true)
 |-- shop_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item_price: double (nullable = true)
 |-- item_cnt_day: integer (nullable = true)



In [6]:
data.summary().show()

+-------+-----------------+------------------+------------------+------------------+------------------+
|summary|   date_block_num|           shop_id|           item_id|        item_price|      item_cnt_day|
+-------+-----------------+------------------+------------------+------------------+------------------+
|  count|          2935849|           2935849|           2935849|           2935849|           2935849|
|   mean|14.56991146343017|33.001728290521754|10197.227056977385| 890.8532326979881| 1.242640885140891|
| stddev|9.422987708755725| 16.22697304833424|6324.2973538914575|1729.7996307126411|2.6188344308954035|
|    min|                0|                 0|                 0|              -1.0|               -22|
|    25%|                7|              22.0|            4476.0|             249.0|                 1|
|    50%|               14|              31.0|            9346.0|             399.0|                 1|
|    75%|               23|              47.0|           15684.0

- Sau khi mô tả dữ liệu, ta thấy được cột **item_cnt_day** và cột **item_price** có giá trị bất thường nhỏ hơn 0.

In [14]:
len(data.where(data.item_cnt_day < 0).collect())

7356

- Số lượng dòng bất thường trên cột **item_cnt_day** là 7356

In [15]:
len(data.where(data.item_price < 0).collect())

1

- Số lượng dòng bất thường trên cột **item_price** là 1

Số lượng dòng bất thường khá ít so với tổng số lượng dòng mà chúng ta đang có nên chúng ta sẽ loại bỏ chúng đi

In [16]:
data = data.where(data.item_price > 0).where(data.item_cnt_day > 0)
data.summary().show()

+-------+------------------+------------------+------------------+-----------------+------------------+
|summary|    date_block_num|           shop_id|           item_id|       item_price|      item_cnt_day|
+-------+------------------+------------------+------------------+-----------------+------------------+
|  count|           2928492|           2928492|           2928492|          2928492|           2928492|
|   mean|14.569763209187528|33.002952372757036|10200.281966623095|889.4667512710128|1.2483373695403641|
| stddev| 9.422951383876265|16.225428560235574| 6324.395925384864|1727.498582243899| 2.619586031516712|
|    min|                 0|                 0|                 0|             0.07|                 1|
|    max|                33|                 9|              9999|         307980.0|              2169|
+-------+------------------+------------------+------------------+-----------------+------------------+



Ta sẽ tách cột tháng ra từ cột date, vì các tháng trong năm cũng có thể quyết định số lượng  mua hàng

In [17]:
data = data.withColumn('month',(f.floor(data['date_block_num']/12)+1).cast('int'))
data.show(5)

+----------+--------------+-------+-------+----------+------------+-----+
|      date|date_block_num|shop_id|item_id|item_price|item_cnt_day|month|
+----------+--------------+-------+-------+----------+------------+-----+
|2013-01-02|             0|     59|  22154|     999.0|           1|    1|
|2013-01-03|             0|     25|   2552|     899.0|           1|    1|
|2013-01-06|             0|     25|   2554|   1709.05|           1|    1|
|2013-01-15|             0|     25|   2555|    1099.0|           1|    1|
|2013-01-10|             0|     25|   2564|     349.0|           1|    1|
+----------+--------------+-------+-------+----------+------------+-----+
only showing top 5 rows



### **Model training with MLib**

#### Train test split

In [18]:
trainingData = data.where(data.date_block_num <28)
testData = data.where(data.date_block_num>=28)

Tiếp theo các đặc trưng huấn luyện sẽ được kết hợp lại thành cột features.

In [19]:
assembler = VectorAssembler(
    inputCols=['month','item_price','date_block_num'],
    outputCol='features',
    handleInvalid='keep'
)

In [20]:
assembler.transform(trainingData).show(5)

+----------+--------------+-------+-------+----------+------------+-----+-----------------+
|      date|date_block_num|shop_id|item_id|item_price|item_cnt_day|month|         features|
+----------+--------------+-------+-------+----------+------------+-----+-----------------+
|2013-01-02|             0|     59|  22154|     999.0|           1|    1|  [1.0,999.0,0.0]|
|2013-01-03|             0|     25|   2552|     899.0|           1|    1|  [1.0,899.0,0.0]|
|2013-01-06|             0|     25|   2554|   1709.05|           1|    1|[1.0,1709.05,0.0]|
|2013-01-15|             0|     25|   2555|    1099.0|           1|    1| [1.0,1099.0,0.0]|
|2013-01-10|             0|     25|   2564|     349.0|           1|    1|  [1.0,349.0,0.0]|
+----------+--------------+-------+-------+----------+------------+-----+-----------------+
only showing top 5 rows



In [21]:
gbt = GBTRegressor(featuresCol="features", labelCol='item_cnt_day', maxIter=10)
# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[assembler, gbt])

In [22]:
model = pipeline.fit(trainingData)

In [23]:
evaluator = RegressionEvaluator(
    labelCol="item_cnt_day", 
    predictionCol="prediction", 
    metricName="rmse"
)
train_pred = model.transform(trainingData)
test_pred = model.transform(testData)

test_pred.select("prediction", "item_cnt_day").show(5)

print("RMSE train data = %g" % evaluator.evaluate(train_pred))
print("RMSE test data = %g" % evaluator.evaluate(test_pred))

+------------------+------------+
|        prediction|item_cnt_day|
+------------------+------------+
| 1.181191524140303|           1|
|1.0293996116334725|           1|
| 1.290853487503798|           1|
| 1.181191524140303|           1|
| 1.181191524140303|           1|
+------------------+------------+
only showing top 5 rows

RMSE train data = 2.0582
RMSE test data = 5.06909


In [24]:
gbtModel = model.stages[1]
print(gbtModel)  # summary only

GBTRegressionModel: uid=GBTRegressor_a25b03a24986, numTrees=10, numFeatures=3
