# Finding frequent items


## Init step

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, collect_set
from itertools import combinations
from collections import defaultdict
import itertools
from pyspark.sql import Row
from pyspark.rdd import RDD

In [None]:
spark = SparkSession.builder \
    .appName("PCY Algorithm") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

## read input file

In [None]:
file_path = "/content/drive/My Drive/Big-Midterm/baskets.csv"

df = spark.read.csv(file_path, header=True, inferSchema=True)
df.show(5)

+-------------+----------+----------------+----+-----+---+-----------+
|Member_number|      Date| itemDescription|year|month|day|day_of_week|
+-------------+----------+----------------+----+-----+---+-----------+
|         1249|01/01/2014|    citrus fruit|2014|    1|  1|          2|
|         1249|01/01/2014|          coffee|2014|    1|  1|          2|
|         1381|01/01/2014|            curd|2014|    1|  1|          2|
|         1381|01/01/2014|            soda|2014|    1|  1|          2|
|         1440|01/01/2014|other vegetables|2014|    1|  1|          2|
+-------------+----------+----------------+----+-----+---+-----------+
only showing top 5 rows



## Preprocessing

### group by member & date

In [None]:
df_basket = df.groupBy("Member_number", "Date") \
    .agg(collect_set("itemDescription").alias("basket"))

df_basket.show(5, truncate=False)

+-------------+----------+--------------------------------------------------+
|Member_number|Date      |basket                                            |
+-------------+----------+--------------------------------------------------+
|1000         |15/03/2015|[whole milk, sausage, yogurt, semi-finished bread]|
|1000         |24/06/2014|[pastry, whole milk, salty snack]                 |
|1000         |24/07/2015|[misc. beverages, canned beer]                    |
|1000         |25/11/2015|[sausage, hygiene articles]                       |
|1000         |27/05/2015|[pickled vegetables, soda]                        |
+-------------+----------+--------------------------------------------------+
only showing top 5 rows



## PCY

### init PCY

In [None]:
class HashBucket:
    """
    Implements a hash table for the PCY algorithm to store and count hashed pairs.
    """
    def __init__(self, num_buckets):
        self.num_buckets = num_buckets
        self.buckets = [0] * num_buckets  # Initialize bucket counts

    def hash_function(self, item1, item2):
        """
        Hash function for hashing item pairs into buckets.
        """
        return (hash(item1) ^ hash(item2)) % self.num_buckets

    def increment_bucket(self, item1, item2):
        """
        Increments the count of a bucket corresponding to a given pair.
        """
        index = self.hash_function(item1, item2)
        self.buckets[index] += 1

    def is_frequent(self, item1, item2, threshold):
        """
        Checks if a hashed pair meets the frequency threshold.
        """
        index = self.hash_function(item1, item2)
        return self.buckets[index] >= threshold

In [None]:
class PCY:
    """
    Implementation of the PCY (Park-Chen-Yu) algorithm for frequent itemset mining.
    """
    def __init__(self, min_support, num_buckets, min_confidence, spark):
        """
        Initializes the PCY algorithm with minimum support, hash bucket count, and confidence threshold.
        """
        self.min_support = min_support
        self.min_confidence = min_confidence
        self.num_buckets = num_buckets
        self.hash_table = HashBucket(num_buckets)  # Create a hash bucket instance
        self.spark = spark  # Store Spark session for DataFrame operations

    def first_pass(self, baskets):
        """
        First pass of PCY: Counts item frequencies and hashes pairs into buckets.
        """
        if isinstance(baskets, RDD):  # If input is an RDD
            item_counts = baskets.flatMap(lambda basket: [(item, 1) for item in basket]) \
                                .reduceByKey(lambda a, b: a + b)
            frequent_items = item_counts.filter(lambda x: x[1] >= self.min_support).collectAsMap()
        else:
            item_counts = defaultdict(int)
            for basket in baskets:
                for item in basket:
                    item_counts[item] += 1  # Count item occurrences
                for item1, item2 in combinations(basket, 2):
                    self.hash_table.increment_bucket(item1, item2)  # Hash item pairs
            frequent_items = {item: count for item, count in item_counts.items() if count >= self.min_support}

        self.frequent_items = frequent_items
        return frequent_items

    def second_pass(self, baskets):
        """
        Second pass of PCY: Identifies frequent item pairs.
        """
        if isinstance(baskets, RDD):
            pair_counts = baskets.flatMap(lambda basket: [(pair, 1) for pair in combinations(basket, 2)
                                                           if self.hash_table.is_frequent(pair[0], pair[1], self.min_support)
                                                           and pair[0] in self.frequent_items and pair[1] in self.frequent_items]) \
                                .reduceByKey(lambda a, b: a + b)
            frequent_pairs = pair_counts.filter(lambda x: x[1] >= self.min_support).collectAsMap()
        else:
            pair_counts = defaultdict(int)
            for basket in baskets:
                for item1, item2 in combinations(basket, 2):
                    if self.hash_table.is_frequent(item1, item2, self.min_support):
                        if item1 in self.frequent_items and item2 in self.frequent_items:
                            pair_counts[(item1, item2)] += 1  # Count frequent pairs
            frequent_pairs = {pair: count for pair, count in pair_counts.items() if count >= self.min_support}

        self.frequent_pairs = frequent_pairs
        return frequent_pairs

    def generate_association_rules(self, baskets):
        """
        Generates association rules from frequent item pairs.
        """
        if isinstance(baskets, RDD):
            item_counts = baskets.flatMap(lambda basket: [(item, 1) for item in basket]) \
                                 .reduceByKey(lambda a, b: a + b) \
                                 .collectAsMap()
        else:
            item_counts = defaultdict(int)
            for basket in baskets:
                for item in basket:
                    item_counts[item] += 1  # Count individual items

        rules = []
        for (A, B), support_AB in self.frequent_pairs.items():
            confidence_AB = support_AB / item_counts[A]
            confidence_BA = support_AB / item_counts[B]
            if confidence_AB >= self.min_confidence:
                rules.append(Row(antecedent=A, consequent=B, confidence=confidence_AB))
            if confidence_BA >= self.min_confidence:
                rules.append(Row(antecedent=B, consequent=A, confidence=confidence_BA))

        return self.spark.createDataFrame(rules)  # Convert to DataFrame

    def debug_first_pass(self, baskets):
        """
        Debug method: Runs the first pass and prints key results.
        """
        print("[DEBUG] Starting first pass...")
        frequent_items = self.first_pass(baskets)
        print(f"===============================\nTotal frequent items: {len(frequent_items)}")
        return self.spark.createDataFrame([Row(item=item, count=count) for item, count in frequent_items.items()])

    def debug_second_pass(self, baskets):
        """
        Debug method: Runs the second pass and prints key results.
        """
        print("[DEBUG] Starting second pass...")
        frequent_pairs = self.second_pass(baskets)
        print(f"===============================\nTotal frequent pairs: {len(frequent_pairs)}")
        return self.spark.createDataFrame([Row(item1=p[0], item2=p[1], count=count) for p, count in frequent_pairs.items()])

    def debug_generate_association_rules(self, baskets):
        """
        Debug method: Generates association rules and prints key results.
        """
        print("[DEBUG] Generating association rules...")
        rules_df = self.generate_association_rules(baskets)
        print(f"===============================\nTotal rules: {rules_df.count()}")
        return rules_df

    def run(self, baskets):
        """
        Runs the entire PCY algorithm: first pass, second pass, and rule generation.
        """
        self.first_pass(baskets)
        self.second_pass(baskets)
        return self.generate_association_rules(baskets)


### Debug

In [None]:
baskets = df_basket.select("basket").rdd.flatMap(lambda x: x).collect()

In [None]:
pcy = PCY(min_support=20, num_buckets=500, min_confidence=0.1,spark = spark)

print("=== First Pass ===")
df_frequent_items = pcy.debug_first_pass(baskets)
df_frequent_items.show()

print("=== Second Pass ===")
df_frequent_pairs = pcy.debug_second_pass(baskets)
df_frequent_pairs.show()

print("=== Association Rules ===")
df_rules = pcy.debug_generate_association_rules(baskets)
df_rules.show()


=== First Pass ===
[DEBUG] Starting first pass...
Total frequent items: 141
+-------------------+-----+
|               item|count|
+-------------------+-----+
|         whole milk| 2363|
|            sausage|  903|
|             yogurt| 1285|
|semi-finished bread|  142|
|             pastry|  774|
|        salty snack|  281|
|    misc. beverages|  236|
|        canned beer|  702|
|   hygiene articles|  205|
| pickled vegetables|  134|
|               soda| 1453|
|         rolls/buns| 1646|
|        frankfurter|  565|
|               curd|  504|
|               beef|  508|
|        white bread|  359|
| whipped/sour cream|  654|
|  frozen vegetables|  419|
|   other vegetables| 1827|
|             butter|  527|
+-------------------+-----+
only showing top 20 rows

=== Second Pass ===
[DEBUG] Starting second pass...
Total frequent pairs: 407
+-----------------+-------------------+-----+
|            item1|              item2|count|
+-----------------+-------------------+-----+
|       wh