In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [3]:
sc = SparkContext()

In [4]:
spark = SparkSession(sc)

### Read the dataset

In [5]:
data = spark.read.csv('Du_lieu_cung_cap/75000/75000i.csv')

In [6]:
data.show(10)

+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  1|  1| 21|
|  1|  5| 11|
|  2|  1|  7|
|  2|  3| 11|
|  2|  4| 37|
|  2|  3| 45|
|  3|  5|  3|
|  3|  3| 42|
|  3|  3| 33|
|  4|  1|  5|
+---+---+---+
only showing top 10 rows



In [7]:
data = (data
        .withColumnRenamed('_c0', 'order_id')
        .withColumnRenamed('_c1', 'quantity')
        .withColumnRenamed('_c2', 'product_id')
       )

In [8]:
data.show(10)

+--------+--------+----------+
|order_id|quantity|product_id|
+--------+--------+----------+
|       1|       1|        21|
|       1|       5|        11|
|       2|       1|         7|
|       2|       3|        11|
|       2|       4|        37|
|       2|       3|        45|
|       3|       5|         3|
|       3|       3|        42|
|       3|       3|        33|
|       4|       1|         5|
+--------+--------+----------+
only showing top 10 rows



In [9]:
data.count()

266209

In [10]:
data.describe().show()

+-------+------------------+------------------+------------------+
|summary|          order_id|          quantity|        product_id|
+-------+------------------+------------------+------------------+
|  count|            266209|            266209|            266209|
|   mean|37506.447599442545|3.0004620429812667| 24.28233455668291|
| stddev|21665.459090368462|1.4135532622940965|14.771318203617861|
|    min|                 1|                 1|                 0|
|    max|              9999|                 5|                 9|
+-------+------------------+------------------+------------------+



### Build model

In [11]:
from pyspark.ml.fpm import FPGrowth

In [12]:
# Pre-processing data
from pyspark.sql.functions import collect_list, col, count, collect_set
# Convert list array to string
from pyspark.sql.types import StringType

In [13]:
data.createOrReplaceTempView('order_products_train')

In [14]:
products = spark.sql('select distinct product_id from order_products_train')
products.count()

50

In [15]:
rawData = spark.sql('select * from order_products_train')
baskets = rawData.groupBy('order_id').agg(collect_set('product_id').alias('items'))
baskets.createOrReplaceTempView('baskets')

In [16]:
baskets.show(5, truncate=False)

+--------+--------------------+
|order_id|items               |
+--------+--------------------+
|100     |[42, 33]            |
|10000   |[46, 2, 0]          |
|10001   |[44, 27, 32, 14, 48]|
|10009   |[25, 22, 43]        |
|10010   |[17, 29, 47, 19]    |
+--------+--------------------+
only showing top 5 rows



In [17]:
type(baskets)

pyspark.sql.dataframe.DataFrame

In [18]:
FpGrowth = FPGrowth(itemsCol='items', minSupport=0.005, minConfidence=0.03)
model = FpGrowth.fit(baskets)

In [19]:
# Display frequent items
model.freqItemsets.show(5)

+------------+----+
|       items|freq|
+------------+----+
|        [26]|3179|
|        [40]|5118|
|        [43]|4685|
|    [43, 41]|1719|
|[43, 41, 24]|1570|
+------------+----+
only showing top 5 rows



In [20]:
# transform examines the input items against all the association rules and summarize the consequents as prediction
mostPopularItemInABasket = model.transform(baskets)

In [21]:
mostPopularItemInABasket.show(3, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------------
 order_id   | 100                                                  
 items      | [42, 33]                                             
 prediction | [27, 28, 5, 14, 35, 4, 18, 44, 22, 32, 47, 19, 1]    
-RECORD 1----------------------------------------------------------
 order_id   | 10000                                                
 items      | [46, 2, 0]                                           
 prediction | [28]                                                 
-RECORD 2----------------------------------------------------------
 order_id   | 10001                                                
 items      | [44, 27, 32, 14, 48]                                 
 prediction | [9, 33, 28, 4, 22, 42, 1, 45, 16, 31, 12, 36, 18, 5] 
only showing top 3 rows



### Use product_name instead of product_id

In [22]:
product_data = spark.read.csv('Du_lieu_cung_cap/75000/goods.csv', header=True,
                      inferSchema=True)

In [23]:
product_data.show(5, truncate=False)

+---+------------+------+-----+------+
|Id |Flavor      |Food  |Price|Type  |
+---+------------+------+-----+------+
|0  |'Chocolate' |'Cake'|8.95 |'Food'|
|1  |'Lemon'     |'Cake'|8.95 |'Food'|
|2  |'Casino'    |'Cake'|15.95|'Food'|
|3  |'Opera'     |'Cake'|15.95|'Food'|
|4  |'Strawberry'|'Cake'|11.95|'Food'|
+---+------------+------+-----+------+
only showing top 5 rows



In [24]:
from pyspark.sql import functions as F

In [25]:
# Concate column Flavor and Food into Flavor_and_Food
product_data = product_data.withColumn('Flavor_and_Food', 
                    F.concat(F.col('Flavor'),F.lit(' '), F.col('Food')))
product_data = product_data.select('Id', 'Flavor_and_Food', 'Price', 'Type')
product_data.show(5, truncate=False)

+---+-------------------+-----+------+
|Id |Flavor_and_Food    |Price|Type  |
+---+-------------------+-----+------+
|0  |'Chocolate' 'Cake' |8.95 |'Food'|
|1  |'Lemon' 'Cake'     |8.95 |'Food'|
|2  |'Casino' 'Cake'    |15.95|'Food'|
|3  |'Opera' 'Cake'     |15.95|'Food'|
|4  |'Strawberry' 'Cake'|11.95|'Food'|
+---+-------------------+-----+------+
only showing top 5 rows



In [26]:
product_data = product_data.withColumn('Flavor_and_Food', F.regexp_replace('Flavor_and_Food', "\'", ''))
product_data.show()

+---+----------------+-----+------+
| Id| Flavor_and_Food|Price|  Type|
+---+----------------+-----+------+
|  0|  Chocolate Cake| 8.95|'Food'|
|  1|      Lemon Cake| 8.95|'Food'|
|  2|     Casino Cake|15.95|'Food'|
|  3|      Opera Cake|15.95|'Food'|
|  4| Strawberry Cake|11.95|'Food'|
|  5|    Truffle Cake|15.95|'Food'|
|  6|Chocolate Eclair| 3.25|'Food'|
|  7|   Coffee Eclair|  3.5|'Food'|
|  8|  Vanilla Eclair| 3.25|'Food'|
|  9|   Napoleon Cake|13.49|'Food'|
| 10|     Almond Tart| 3.75|'Food'|
| 11|       Apple Pie| 5.25|'Food'|
| 12|      Apple Tart| 3.25|'Food'|
| 13|    Apricot Tart| 3.25|'Food'|
| 14|      Berry Tart| 3.25|'Food'|
| 15| Blackberry Tart| 3.25|'Food'|
| 16|  Blueberry Tart| 3.25|'Food'|
| 17|  Chocolate Tart| 3.75|'Food'|
| 18|     Cherry Tart| 3.25|'Food'|
| 19|      Lemon Tart| 3.25|'Food'|
+---+----------------+-----+------+
only showing top 20 rows



In [27]:
product_data.createOrReplaceTempView('products')

In [28]:
query = "select p.Flavor_and_Food, o.order_id from products p inner join order_products_train o where o.product_id = p.Id"
rawData_1 = spark.sql(query)
baskets_1 = rawData_1.groupBy('order_id').agg(collect_set('Flavor_and_Food').alias('items'))
baskets_1.createOrReplaceTempView('baskets1')

In [29]:
baskets_1.head(3)

[Row(order_id='10000', items=['Casino Cake', 'Chocolate Coffee', 'Chocolate Cake']),
 Row(order_id='10009', items=['Gongolais Cookie', 'Green Tea', 'Chocolate Meringue']),
 Row(order_id='10010', items=['Vanilla Frappuccino', 'Walnut Cookie', 'Chocolate Tart', 'Lemon Tart'])]

In [30]:
FpGrowth_1 = FPGrowth(itemsCol='items', minSupport=0.005, minConfidence=0.03)
model_1 = FpGrowth_1.fit(baskets_1)

In [31]:
# Display frequent itemsets
model_1.freqItemsets.show(truncate=False)

+--------------------------------------------------+----+
|items                                             |freq|
+--------------------------------------------------+----+
|[Raspberry Lemonade]                              |5081|
|[Raspberry Lemonade, Lemon Cookie]                |2087|
|[Raspberry Lemonade, Lemon Cookie, Lemon Lemonade]|1922|
|[Raspberry Lemonade, Lemon Lemonade]              |2088|
|[Marzipan Cookie]                                 |6733|
|[Marzipan Cookie, Tuile Cookie]                   |3819|
|[Marzipan Cookie, Strawberry Cake]                |409 |
|[Marzipan Cookie, Gongolais Cookie]               |420 |
|[Marzipan Cookie, Orange Juice]                   |387 |
|[Almond Bear Claw]                                |3183|
|[Almond Twist]                                    |5790|
|[Almond Twist, Hot Coffee]                        |2319|
|[Almond Twist, Hot Coffee, Coffee Eclair]         |2109|
|[Almond Twist, Coffee Eclair]                     |2784|
|[Ganache Cook

In [32]:
# transform examines the input items against all the association rules and summarize the consequents as prediction
mostPopularItemInABasket_1 = model_1.transform(baskets_1)

In [33]:
mostPopularItemInABasket_1.show(3, truncate=False, vertical=True)

-RECORD 0---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 order_id   | 10000                                                                                                                                                                                   
 items      | [Casino Cake, Chocolate Coffee, Chocolate Cake]                                                                                                                                         
 prediction | [Tuile Cookie]                                                                                                                                                                          
-RECORD 1---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 orde

- As we can see with the most popular items in the baskets, if you bought cake/coffee/tea you would most likely to buy another type of cake/croissant/cookie/tart or juice/lemonade