In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [20]:
import pyspark.sql.functions as F

In [3]:
spark = SparkSession.builder.appName('FP-Growth').getOrCreate()

In [6]:
# import intercart Order dataset

# the list of products
products = spark.read.csv('data/products.csv', header=True, inferSchema=True)

products.show(5)

+----------+--------------------+--------+-------------+
|product_id|        product_name|aisle_id|department_id|
+----------+--------------------+--------+-------------+
|         1|Chocolate Sandwic...|      61|           19|
|         2|    All-Seasons Salt|     104|           13|
|         3|Robust Golden Uns...|      94|            7|
|         4|Smart Ones Classi...|      38|            1|
|         5|Green Chile Anyti...|       5|           13|
+----------+--------------------+--------+-------------+
only showing top 5 rows



In [7]:
products.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- aisle_id: string (nullable = true)
 |-- department_id: string (nullable = true)



In [16]:
# order information
orders = spark.read.csv('data/order_products_train.csv', header=True, inferSchema=True)

orders.show(5)

+--------+----------+-----------------+---------+
|order_id|product_id|add_to_cart_order|reordered|
+--------+----------+-----------------+---------+
|       1|     49302|                1|        1|
|       1|     11109|                2|        1|
|       1|     10246|                3|        0|
|       1|     49683|                4|        0|
|       1|     43633|                5|        1|
+--------+----------+-----------------+---------+
only showing top 5 rows



In [17]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- add_to_cart_order: integer (nullable = true)
 |-- reordered: integer (nullable = true)



In [18]:
# Order with product name by join function
order_with_name = orders.join(products, 'product_id')

order_with_name.show(5)

+----------+--------+-----------------+---------+--------------------+--------+-------------+
|product_id|order_id|add_to_cart_order|reordered|        product_name|aisle_id|department_id|
+----------+--------+-----------------+---------+--------------------+--------+-------------+
|     49302|       1|                1|        1|    Bulgarian Yogurt|     120|           16|
|     11109|       1|                2|        1|Organic 4% Milk F...|     108|           16|
|     10246|       1|                3|        0|Organic Celery He...|      83|            4|
|     49683|       1|                4|        0|      Cucumber Kirby|      83|            4|
|     43633|       1|                5|        1|Lightly Smoked Sa...|      95|           15|
+----------+--------+-----------------+---------+--------------------+--------+-------------+
only showing top 5 rows



In [24]:
orders_df = order_with_name.groupby('order_id').agg(F.collect_list('product_id').alias('items'))

orders_df.show(5)
orders_df.cache()

+--------+--------------------+
|order_id|               items|
+--------+--------------------+
|    1342|[13176, 30827, 14...|
|    1591|[17203, 44008, 48...|
|    4519|             [29270]|
|    4935|             [45190]|
|    6357|[37524, 33731, 43...|
+--------+--------------------+
only showing top 5 rows



DataFrame[order_id: int, items: array<int>]

In [22]:
# import MLlib ; FPGrowth
from pyspark.ml.fpm import FPGrowth

In [31]:
fpGrowth = FPGrowth(itemsCol='items', minSupport=0.001, minConfidence=0.5)

In [32]:
model = fpGrowth.fit(orders_df)

In [28]:
model.freqItemsets.show(10)

+--------------+-----+
|         items| freq|
+--------------+-----+
|       [24852]|18726|
|       [13176]|15480|
|       [21137]|10894|
|[21137, 13176]| 3074|
|[21137, 24852]| 2174|
|       [21903]| 9784|
|[21903, 21137]| 1639|
|[21903, 13176]| 2236|
|[21903, 24852]| 2000|
|       [47626]| 8135|
+--------------+-----+
only showing top 10 rows



In [33]:
model.associationRules.show()

+--------------------+----------+------------------+------------------+
|          antecedent|consequent|        confidence|              lift|
+--------------------+----------+------------------+------------------+
|       [4605, 16797]|   [24852]|0.5357142857142857|3.7536332219526702|
|[27966, 47209, 21...|   [13176]|0.5984251968503937| 5.072272070642333|
|       [9839, 47209]|   [13176]|0.5048231511254019| 4.278897986822536|
|       [8174, 47209]|   [13176]|0.5283018867924528| 4.477904539027839|
|      [22825, 47209]|   [13176]|0.5170454545454546|4.3824946411792345|
|      [39928, 47209]|   [13176]|0.5459770114942529| 4.627719489738336|
|       [8174, 27966]|   [13176]|0.5412186379928315| 4.587387356098284|
|[30391, 47209, 21...|   [13176]|          0.546875| 4.635330870478036|
|      [27966, 47209]|   [13176]| 0.521099116781158| 4.416853618458589|
|      [35951, 47209]|   [13176]|0.5141065830721003| 4.357584667849303|
|      [22035, 47209]|   [13176]|0.5314685314685315| 4.504745125

In [35]:
model.transform(orders_df).show()

+--------+--------------------+----------+
|order_id|               items|prediction|
+--------+--------------------+----------+
|    1342|[13176, 30827, 14...|        []|
|    1591|[17203, 44008, 48...|        []|
|    4519|             [29270]|        []|
|    4935|             [45190]|        []|
|    6357|[37524, 33731, 43...|        []|
|   10362|[22451, 46823, 47...|        []|
|   19204|[25783, 26165, 45...|        []|
|   29601|[4472, 44329, 271...|        []|
|   31035|[13176, 8174, 407...|        []|
|   40011|[27292, 7905, 219...|        []|
|   46266|[23029, 38558, 34...|        []|
|   51607|[41390, 42752, 14...|        []|
|   58797|[13176, 3265, 983...|        []|
|   61793|[43352, 8859, 196...|        []|
|   67089|[24852, 47766, 27...|        []|
|   70863|[34791, 42049, 17...|        []|
|   88674|[6347, 16262, 220...|        []|
|   91937|[25567, 20708, 38...|        []|
|   92317|[24852, 41005, 46...|        []|
|   99621|[43789, 38266, 40...|        []|
+--------+-