In [None]:
import pyspark
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

In [None]:
sc = SparkContext()

In [None]:
spark = SparkSession.builder.appName('bakery_basket').getOrCreate()

In [None]:
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.functions import split, explode
import pyspark.sql.functions as f

In [None]:
# Pre-processing data
from pyspark.sql.functions import collect_list, col, count, collect_set
# Convert list array to string
from pyspark.sql.types import StringType

In [None]:
# Loads data.
data = spark.read.csv('75000/75000i.csv', header=False,
                      inferSchema=True)

In [None]:
df3 = spark.read.options(header='False', inferSchema='True', delimiter=';') \
  .csv('75000/75000-out1.csv')

In [None]:
df3.show(10)

+--------------------+
|                 _c0|
+--------------------+
|           1, 11, 21|
|    2, 7, 11, 37, 45|
|        3, 3, 33, 42|
|    4, 5, 12, 17, 47|
|        5, 6, 18, 42|
|         6, 2, 4, 34|
|   7, 15, 16, 23, 40|
|     8, 2, 3, 29, 34|
|9, 18, 23, 26, 35...|
|          10, 44, 45|
+--------------------+
only showing top 10 rows



In [None]:
df = split(df3['_c0'], ',')
df3 = df3.withColumn('order_id', df.getItem(0))
baskets =df3.select('order_id',
                      split(f.regexp_replace('_c0', '^[A-Za-z0-9]+(?=,),', ''),',').alias('items'))
baskets.createOrReplaceTempView("baskets")

In [None]:
baskets.show(10)

+--------+--------------------+
|order_id|               items|
+--------+--------------------+
|       1|          [ 11,  21]|
|       2| [ 7,  11,  37,  45]|
|       3|      [ 3,  33,  42]|
|       4| [ 5,  12,  17,  47]|
|       5|      [ 6,  18,  42]|
|       6|       [ 2,  4,  34]|
|       7|[ 15,  16,  23,  40]|
|       8|  [ 2,  3,  29,  34]|
|       9|[ 18,  23,  26,  ...|
|      10|          [ 44,  45]|
+--------+--------------------+
only showing top 10 rows



In [None]:
product_id = baskets.withColumn('product_id', explode(baskets['items']))
product_id[['order_id', 'product_id']].show(10)

+--------+----------+
|order_id|product_id|
+--------+----------+
|       1|        11|
|       1|        21|
|       2|         7|
|       2|        11|
|       2|        37|
|       2|        45|
|       3|         3|
|       3|        33|
|       3|        42|
|       4|         5|
+--------+----------+
only showing top 10 rows



In [None]:
product_id.createOrReplaceTempView("order_products")

In [None]:
fpGrowth = FPGrowth(itemsCol = "items", minSupport = 0.003, minConfidence = 0.003)
model = fpGrowth.fit(baskets)

In [None]:
model.freqItemsets.show()

+----------+----+
|     items|freq|
+----------+----+
|     [ 19]|5685|
|[ 19,  27]| 359|
|[ 19,  33]| 334|
| [ 19,  1]|2764|
|[ 19,  28]| 408|
|[ 19,  37]| 274|
|[ 19,  35]| 312|
|[ 19,  16]| 286|
| [ 19,  4]| 388|
|[ 19,  46]| 324|
|[ 19,  15]| 298|
| [ 19,  5]| 323|
|[ 19,  22]| 368|
|[ 19,  32]| 297|
|[ 19,  45]| 344|
|[ 19,  47]| 331|
| [ 19,  3]| 294|
|[ 19,  14]| 350|
|[ 19,  11]| 296|
| [ 19,  0]| 305|
+----------+----+
only showing top 20 rows



Danh sách các sản phẩm được mua nhiều nhất

In [None]:
mostPopularItemInABaskets = model.transform(baskets)

In [None]:
mostPopularItemInABaskets.show(3, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 order_id   | 1                                                                                                                                                                                                                                  
 items      | [ 11,  21]                                                                                                                                                                                                                         
 prediction | [ 19,  33,  27,  1,  28,  35,  16,  46,  4,  15,  22,  5,  32,  45,  47,  14,  0,  18,  49,  9,  44,  42,  7,  17,  29,  2,  40,  37,  3]                                                                                          
-RECORD 1-----------------------

In [None]:
from re import escape
product = spark.read.csv('75000/goods.csv', header=True, 
                      inferSchema=True)

In [None]:
product.show(5, truncate=True)

+---+------------+------+-----+------+
| Id|      Flavor|  Food|Price|  Type|
+---+------------+------+-----+------+
|  0| 'Chocolate'|'Cake'| 8.95|'Food'|
|  1|     'Lemon'|'Cake'| 8.95|'Food'|
|  2|    'Casino'|'Cake'|15.95|'Food'|
|  3|     'Opera'|'Cake'|15.95|'Food'|
|  4|'Strawberry'|'Cake'|11.95|'Food'|
+---+------------+------+-----+------+
only showing top 5 rows



In [None]:
product = product.withColumn('Flavor', f.regexp_replace('Flavor', "\'", "")).withColumn('Food', f.regexp_replace('Food', "\'", ""))
product.show(10)

+---+----------+------+-----+------+
| Id|    Flavor|  Food|Price|  Type|
+---+----------+------+-----+------+
|  0| Chocolate|  Cake| 8.95|'Food'|
|  1|     Lemon|  Cake| 8.95|'Food'|
|  2|    Casino|  Cake|15.95|'Food'|
|  3|     Opera|  Cake|15.95|'Food'|
|  4|Strawberry|  Cake|11.95|'Food'|
|  5|   Truffle|  Cake|15.95|'Food'|
|  6| Chocolate|Eclair| 3.25|'Food'|
|  7|    Coffee|Eclair|  3.5|'Food'|
|  8|   Vanilla|Eclair| 3.25|'Food'|
|  9|  Napoleon|  Cake|13.49|'Food'|
+---+----------+------+-----+------+
only showing top 10 rows



In [None]:
product.createOrReplaceTempView("products")

In [None]:
query = "select concat(trim(p.Flavor),' ', trim(p.Food)) as Flavor_Food_Name, o.order_id from products p inner join order_products o where o.product_id = p.Id"

rawData = spark.sql(query)

In [None]:
rawData.show()

+-------------------+--------+
|   Flavor_Food_Name|order_id|
+-------------------+--------+
|          Apple Pie|       1|
|     Ganache Cookie|       1|
|      Coffee Eclair|       2|
|          Apple Pie|       2|
|       Almond Twist|       2|
|         Hot Coffee|       2|
|         Opera Cake|       3|
|   Cheese Croissant|       3|
|       Orange Juice|       3|
|       Truffle Cake|       4|
|         Apple Tart|       4|
|     Chocolate Tart|       4|
|Vanilla Frappuccino|       4|
|   Chocolate Eclair|       5|
|        Cherry Tart|       5|
|       Orange Juice|       5|
|        Casino Cake|       6|
|    Strawberry Cake|       6|
|Chocolate Croissant|       6|
|    Blackberry Tart|       7|
+-------------------+--------+
only showing top 20 rows



In [None]:
baskets_1 = rawData.groupBy('order_id').agg(collect_set('Flavor_Food_Name').alias('items'))
baskets_1.sort('order_id').show(10, truncate=True)

+--------+--------------------+
|order_id|               items|
+--------+--------------------+
|       1|[Ganache Cookie, ...|
|      10|[Bottled Water, H...|
|     100|[Orange Juice, Ch...|
|    1000|[Apricot Croissan...|
|   10000|[Casino Cake, Cho...|
|   10001|[Bottled Water, B...|
|   10002|[Bottled Water, V...|
|   10003|[Truffle Cake, Go...|
|   10004|[Raspberry Cookie...|
|   10005|[Bottled Water, T...|
+--------+--------------------+
only showing top 10 rows



In [None]:
baskets_1.createOrReplaceTempView('baskets1')

In [None]:
baskets_1.head(3)

[Row(order_id='100', items=['Orange Juice', 'Cheese Croissant']),
 Row(order_id='1000', items=['Apricot Croissant', 'Raspberry Lemonade', 'Lemon Tart']),
 Row(order_id='10000', items=['Casino Cake', 'Chocolate Coffee', 'Chocolate Cake'])]

In [None]:
fpGrowth_1 = FPGrowth(itemsCol = "items", minSupport = 0.003, minConfidence = 0.003)
model_1= fpGrowth_1.fit(baskets_1)

In [None]:
model_1.freqItemsets.show(truncate=False)

+--------------------------------------------------+----+
|items                                             |freq|
+--------------------------------------------------+----+
|[Raspberry Lemonade]                              |5081|
|[Raspberry Lemonade, Lemon Tart]                  |253 |
|[Raspberry Lemonade, Cheese Croissant]            |310 |
|[Raspberry Lemonade, Marzipan Cookie]             |292 |
|[Raspberry Lemonade, Lemon Cake]                  |264 |
|[Raspberry Lemonade, Lemon Cookie]                |2087|
|[Raspberry Lemonade, Lemon Cookie, Lemon Lemonade]|1922|
|[Raspberry Lemonade, Tuile Cookie]                |303 |
|[Raspberry Lemonade, Almond Twist]                |235 |
|[Raspberry Lemonade, Apricot Danish]              |274 |
|[Raspberry Lemonade, Blueberry Tart]              |249 |
|[Raspberry Lemonade, Chocolate Coffee]            |257 |
|[Raspberry Lemonade, Strawberry Cake]             |304 |
|[Raspberry Lemonade, Blackberry Tart]             |237 |
|[Raspberry Le

In [None]:
mostPopularItemInABaskets_1 = model_1.transform(baskets_1)

In [None]:
mostPopularItemInABaskets_1.show(3, truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 order_id   | 100                                                                                                                                                                                                                               

In [None]:
from pyspark.sql.types import StringType

In [None]:
mostPopularItemInABaskets_1.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- items: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- prediction: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [None]:
mostPopularItemInABaskets_1.createOrReplaceTempView("new_view")

In [None]:
DF_cast = mostPopularItemInABaskets_1.select('order_id', 
                                            mostPopularItemInABaskets_1.items.cast(StringType()),
                                            mostPopularItemInABaskets_1.prediction.cast(StringType())) 

DF_cast.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- items: string (nullable = false)
 |-- prediction: string (nullable = true)



In [None]:
DF_cast.show(3, truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 order_id   | 100                                                                                                                                                                                                                               