In [36]:
from pyspark.sql import SparkSession, functions as f
from pyspark.ml.fpm import FPGrowth

In [37]:
spark = SparkSession.builder.appName('walmart-data').master('local').getOrCreate()

In [38]:
product_schema = 'product_id INT, product_name STRING, aisle_id INT, department_id INT'
order_product_schema = 'order_id INT, product_id INT, add_to_cart_order INT, reordered INT'

In [39]:
df_products = spark.read.option("header", True).schema(product_schema).csv('./datasets/products.csv').select('product_id')
df_products.show(5)

+----------+
|product_id|
+----------+
|         1|
|         2|
|         3|
|         4|
|         5|
+----------+


In [40]:
df_order_products = spark.read.option("header", True).schema(order_product_schema).csv(
    './datasets/order_products__prior.csv').drop("add_to_cart_order", "reordered").dropDuplicates()
df_order_products.show(5)

+--------+----------+
|order_id|product_id|
+--------+----------+
|      27|     47766|
|      83|     13292|
|      93|     47226|
|      95|     48287|
|     106|     47144|
+--------+----------+


In [41]:
df_transactions = df_order_products.groupBy('order_id').agg(f.collect_set('product_id').alias('items_transactions'))
df_transactions.show(truncate=False)

+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|order_id|items_transactions                                                                                                                                                                        |
+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2       |[17794, 1819, 40141, 9327, 28985, 45918, 30035, 33120, 43668]                                                                                                                             |
|3       |[32665, 33754, 17461, 17704, 21903, 17668, 24838, 46667]                                                                                                                                  |
|4       |

In [42]:
min_support = 0.001
min_confidence= 0.001
fp = FPGrowth(minSupport=min_support, minConfidence=min_confidence, itemsCol='items_transactions')
model = fp.fit(df_transactions)
model.freqItemsets.show(truncate=False)

+--------------+----+
|items         |freq|
+--------------+----+
|[46802]       |118 |
|[46802, 27966]|16  |
|[46802, 24489]|15  |
|[46802, 47209]|20  |
|[46802, 13176]|24  |
|[46802, 21903]|29  |
|[46802, 24852]|27  |
|[32650]       |29  |
|[31964]       |42  |
|[27509]       |22  |
|[13870]       |112 |
|[13870, 21137]|20  |
|[13870, 13176]|23  |
|[13870, 24852]|28  |
|[9741]        |29  |
|[36316]       |42  |
|[36316, 47977]|19  |
|[28465]       |82  |
|[28465, 24799]|27  |
|[28465, 24852]|24  |
+--------------+----+


In [43]:
model.associationRules.show()

+--------------+----------+-------------------+------------------+--------------------+
|    antecedent|consequent|         confidence|              lift|             support|
+--------------+----------+-------------------+------------------+--------------------+
|       [43772]|   [13176]| 0.1485148514851485| 1.238368820159608|0.001002740824921452|
|       [43772]|   [24852]|0.22772277227722773| 1.513329609282563|0.001537535931546...|
|       [14084]|   [21137]|  0.189873417721519| 2.421412153193694|0.001002740824921452|
|       [14084]|   [13176]|0.22784810126582278|1.8998772278904363|0.001203288989905...|
|       [14084]|   [24852]|0.27848101265822783|1.8506430334759796|0.001470686543218...|
|[39275, 24852]|   [27966]|0.23529411764705882| 5.456999544003648|0.001069590213249...|
|[39275, 24852]|   [21137]|0.22058823529411764|2.8131111779750264|0.001002740824921452|
|        [1463]|   [21137]|0.15447154471544716|1.9699401853353573|0.001270138378233839|
|        [1463]|   [47209]|0.154

In [44]:
df_rules = model.associationRules.withColumn('conviction', (1-f.col('support'))/(1-f.col('confidence')))

In [46]:
df_rules.orderBy('conviction', ascending=False).show()

+--------------+----------+------------------+------------------+--------------------+------------------+
|    antecedent|consequent|        confidence|              lift|             support|        conviction|
+--------------+----------+------------------+------------------+--------------------+------------------+
| [4962, 37718]|   [38544]|0.8823529411764706| 377.1176470588235|0.001002740824921452| 8.491476702988166|
|[14947, 35221]|   [44632]|0.8095238095238095|32.036155202821874|0.001136439601577...| 5.244033692091718|
| [4962, 38544]|   [37718]|              0.75|            320.55|0.001002740824921452| 3.995989036700314|
|[37718, 38544]|    [4962]|0.7142857142857143|333.90625000000006|0.001002740824921452| 3.496490407112775|
| [33787, 4957]|   [33754]|               0.7| 73.74154929577463|0.001403837154890...| 3.328653876150366|
|[41787, 28204]|   [24852]|0.6896551724137931| 4.583097167542395|0.001336987766561...|3.2179141505299675|
|[21709, 14947]|   [44632]|0.6818181818181818|