### ecom style proposal
For all orders containing multiple products, find the top 5 most common product pairs that are frequently bought together.
Do it in an optimized way that avoids cartesian explosion!

**collect_set, tuple, struct are used**


In [0]:
from pyspark.sql.functions import col, count, asc, array, collect_set, explode, struct,concat
from itertools import combinations

from pyspark.sql.types import ArrayType,StructType,StructField, StringType
from pyspark.sql.functions import udf

trans = [
    ("O001", "P001"), ("O001", "P002"), ("O001", "P003"),
    ("O002", "P001"), ("O002", "P002"),
    ("O003", "P004"),
    ("O004", "P001"), ("O004", "P003")
]

dftrans = spark.createDataFrame(trans, ["order_id", "product_id"])

dftrans.show()

+--------+----------+
|order_id|product_id|
+--------+----------+
|    O001|      P001|
|    O001|      P002|
|    O001|      P003|
|    O002|      P001|
|    O002|      P002|
|    O003|      P004|
|    O004|      P001|
|    O004|      P003|
+--------+----------+



In [0]:
dforders = dftrans.groupBy("order_id") \
    .agg(collect_set("product_id").alias("products"))

dforders.show()

+--------+------------------+
|order_id|          products|
+--------+------------------+
|    O001|[P002, P003, P001]|
|    O002|      [P002, P001]|
|    O003|            [P004]|
|    O004|      [P003, P001]|
+--------+------------------+



In [0]:
def generate_pairs(products):
    return [tuple(sorted(p)) for p in combinations(products,2)] if len(products)>=2 else[]
    # return [tuple(p) for p in combinations(products,2)] if len(products)>=2 else[]

In [0]:
pair_udf = udf(generate_pairs, ArrayType(StructType([StructField("_1", StringType()),
                                                     StructField("_2", StringType())])))

In [0]:
# dftest =dforders.withColumn("pairs", pair_udf("products")).select(explode("pairs").alias("pair"))
# dftest.show()

In [0]:
dfpair=dforders.withColumn("pairs", pair_udf("products")).select(explode("pairs").alias("pair"))
dfpair=dfpair.select(col("pair._1").alias("product_1"), col("pair._2").alias("product_2"))`
dfpair.show()

+---------+---------+
|product_1|product_2|
+---------+---------+
|     P002|     P003|
|     P001|     P002|
|     P001|     P003|
|     P001|     P002|
|     P001|     P003|
+---------+---------+



In [0]:
dfresult=dfpair\
        .withColumn("product_pair",struct(col("product_1"), col("product_2")))\
            .groupBy("product_1", "product_2").agg(count("*").alias("count")).orderBy(col("count")\
                .desc())
# dfresult.show()

In [0]:
dfresult = dfresult.withColumn("product_pair",struct(col("product_1"), col("product_2"))).drop(col("product_1"), col("product_2"))
dfresult.show()

+-----+------------+
|count|product_pair|
+-----+------------+
|    2|{P001, P003}|
|    2|{P001, P002}|
|    1|{P002, P003}|
+-----+------------+

