Prerequisiti
- Scaricare java 17 da: https://www.oracle.com/it/java/technologies/javase/jdk17-archive-downloads.html
- usare python 3.10
- installare i requirements.txt

Possibili dataset
- Toy Store E-Commerce Database: [Maven+Fuzzy+Factory](https://mavenanalytics.io/data-playground)
- Online Retail II: [online_retail_II.xlsx](https://archive.ics.uci.edu/dataset/502/online+retail+ii)
- Retail Data: [new_retail_data.csv](https://www.kaggle.com/datasets/sahilprajapati143/retail-analysis-large-dataset)
- Microsoft Contoso Retail Data: [ContosoRetailData](https://www.kaggle.com/datasets/bhanuthakurr/cleaned-contoso-dataset?resource=download)


In [161]:
import os

os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/jdk-17.jdk/Contents/Home"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

# controllo
import subprocess
print(subprocess.run(["java", "-version"], capture_output=True, text=True).stderr)

import findspark
findspark.init()

java version "17.0.12" 2024-07-16 LTS
Java(TM) SE Runtime Environment (build 17.0.12+8-LTS-286)
Java HotSpot(TM) 64-Bit Server VM (build 17.0.12+8-LTS-286, mixed mode, sharing)



In [162]:
!echo $JAVA_HOME
!java -version
!javac -version

/Library/Java/JavaVirtualMachines/jdk-17.jdk/Contents/Home
java version "17.0.12" 2024-07-16 LTS
Java(TM) SE Runtime Environment (build 17.0.12+8-LTS-286)
Java HotSpot(TM) 64-Bit Server VM (build 17.0.12+8-LTS-286, mixed mode, sharing)
javac 17.0.12


# Data mining project
The aim of the project is to create a scalable framework for mining data from a retail store, that works in a dynamic enviroment with in streaming alghotithm  or retrained model.

The Section will be divided in the following steps:
- Initial analysis of data like simple EDA to understand:
    - the structure of the data
    - data quality,
    - data types
- The database contains three different types of datasets:
    - Customer Data
    - Purchase Data
    - Product Data
- Algorithm to apply to the data
    - Clustering: Using scalable K-Means clustering as BIRH or CURE to allow in stream segmentation of the data.
    - Market Basket analysis: Using the Apriori algorithm, we extract frequent itemsets and association rules to uncover recurrent purchasing patterns. These rules can be leveraged to identify product bundles, support cross-selling strategies, and serve as a rule-based foundation for a recommendation system.
    - Recommendation system: Based on collaborative filtering with UV decomposition (ALS in spark) to discover hidden latent factors.
    The whole analysis analyse the data in different timeframe, to see how the clustering change with the time

Suggestion of the professor
- sezione temporale
- distribuzione dei tempi per capire come cambiano i cluster
- Capire se i tempi sono uniformi
- Vedere in blocco i parametri come i clustering cambiano con l’aggiunta di nuovi utenti
- Fare uno studio temporale di come questo algoritmo valutando come arriviano i dati lasciano il tempo come parametro
- Network che si possono creare
- Network bipartiti persone - prodotto - persone


## Preprocessing
- Check of the quality of the data
- Check off the structure of the data
- Analysis of quantitative variable
- Analysis of categorical variable



In [163]:
import pandas as pd
customer_df = pd.read_csv("data/customers.csv")
product_df = pd.read_csv("data/products.csv")
purchase_df = pd.read_csv("data/purchases.csv")

df = purchase_df.merge(customer_df, on="CustomerID").merge(product_df, on="product_id")
df.columns

Index(['InvoiceID', 'date', 'CustomerID', 'product_id', 'quantity',
       'customer_type', 'item', 'category', 'price', 'type'],
      dtype='object')

In [164]:
# Structure of the data
n_row, n_cols = df.shape
print(f"Number of observation: {n_row} \nNumber of feature: {n_cols}")

# Analysis og the columns
print("The Feature are:")
print(df.columns.tolist())

# Analysis of the type of the columns
print(df.dtypes)

Number of observation: 436689 
Number of feature: 10
The Feature are:
['InvoiceID', 'date', 'CustomerID', 'product_id', 'quantity', 'customer_type', 'item', 'category', 'price', 'type']
InvoiceID          int64
date              object
CustomerID         int64
product_id         int64
quantity           int64
customer_type     object
item              object
category          object
price            float64
type              object
dtype: object


Let's analyze the temporal timeframe of this dataset in particular:
- the starting and the ending time
- the number of missing days

In [165]:
date_min = df['date'].min()
date_max = df['date'].max()

all_days = pd.date_range(date_min, date_max, freq='D')
df['date'] = pd.to_datetime(df['date'])
missing_days = all_days.difference(df['date'].dt.normalize().unique())

print("Time Frame: From:", date_min, " to ", date_max)
print("Missing days:", len(missing_days))

Time Frame: From: 2014-01-01  to  2015-12-30
Missing days: 1


## Database Explanation
In a real setting, we would have three different types of datasets: Demographic Data, Purchase Data, and Feedback Data.
- Demographic Data contains customer information:
    - Customer_ID (Int)
    - customer_type
- Purchase Data contains transaction and purchase information:
    - InvoiceID (Int)
    - Customer_ID (Int, to link with Demographic Data)
    - Date (Date)
    - Quantity (Int)

- Product Data contains product information:
    - Product_ID (Int)
    - products (String)
    - Category (String)
    - Type (String) ("supermarket","Online")

Notes:
- All three datasets share Customer_ID, which allows joining them.
- InvoiceID is useful to link purchase and feedback data.
- Only Demographic data will be solid during the analysis


## ETL and Algorithm pipelines
We have a simulated portion of a retail store database, divided into separate datasets. Two of them: demographic_data and product_data represent master data, containing information that rarely changes and would typically be stored in a database. The other two purchase_data and feedback_data represent temporary or transactional data, generated in real time and likely stored in a data lake.

First of all we create the structure that the different dataset enter in the spark job as a payload

In [166]:
from pyspark.sql import SparkSession

# We open the spark session to use the Dataframe api
spark = SparkSession.builder \
    .appName("data_mining_project") \
    .config("spark.ui.showConsoleProgress", "false") \
    .getOrCreate()

# Then we open spark context to use the RDD api
sc = spark.sparkContext


In [167]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType

demographic_scheme = StructType([
    StructField("CustomerID", IntegerType(), True),
    StructField("customer_type", StringType(), True)
])

purchase_scheme = StructType([
    StructField("InvoiceID", IntegerType(), False),
    StructField("date", DateType(), False),
    StructField("CustomerID", IntegerType(), False),
    StructField("product_id", StringType(), False),
    StructField("quantity", IntegerType(), False)
])

product_scheme = StructType([
    StructField("product_id", StringType(), False),
    StructField("item", StringType(), False),
    StructField("category", StringType(), False),
    StructField("price", FloatType(), False),
    StructField("type", StringType(), False)

])


In [168]:
demographic_df = spark.read.format("csv")\
    .option("header", "true")\
    .schema(demographic_scheme)\
    .load("data/customers.csv")

purchase_df = spark.read.format("csv")\
    .option("header", "true")\
    .schema(purchase_scheme)\
    .load("data/purchases.csv")

product_df = spark.read.format("csv")\
    .option("header", "true")\
    .schema(product_scheme)\
    .load("data/products.csv")

demographic_df.printSchema()
purchase_df.printSchema()
product_df.printSchema()


root
 |-- CustomerID: integer (nullable = true)
 |-- customer_type: string (nullable = true)

root
 |-- InvoiceID: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- quantity: integer (nullable = true)

root
 |-- product_id: string (nullable = true)
 |-- item: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: float (nullable = true)
 |-- type: string (nullable = true)



In [169]:
from pyspark.sql.functions import collect_list, col, array_distinct

(product_df.filter(col("type") == "supermarket")).collect()

[Row(product_id='522', item='bottled water', category='Miscellaneous', price=7.610000133514404, type='supermarket'),
 Row(product_id='601', item='candles', category='Miscellaneous', price=3.569999933242798, type='supermarket'),
 Row(product_id='1036', item='domestic eggs', category='Miscellaneous', price=1.1699999570846558, type='supermarket'),
 Row(product_id='1005', item='dishes', category='Miscellaneous', price=3.5899999141693115, type='supermarket'),
 Row(product_id='1754', item='instant food products', category='Miscellaneous', price=2.2799999713897705, type='supermarket'),
 Row(product_id='3637', item='tropical fruit', category='Produce', price=3.7300000190734863, type='supermarket'),
 Row(product_id='3280', item='shopping bags', category='Household', price=1.5499999523162842, type='supermarket'),
 Row(product_id='565', item='butter', category='Dairy', price=4.840000152587891, type='supermarket'),
 Row(product_id='2304', item='other vegetables', category='Produce', price=4.980000

### Market Basket Analysis

To perform a Market basket analysis we have to search for the most frequent itemset to understand what customers tends to buy togethers.

In [170]:
# Preprocessing
from pyspark.sql.functions import collect_list, col, array_distinct, sort_array

transactions = (purchase_df
            .join(product_df, on="product_id", how="inner")
            .filter(col("type") == "supermarket")
            .groupBy("InvoiceID")
            .agg(collect_list("item").alias("item"))
            .withColumn("item", sort_array(array_distinct(col("item")))))

print(transactions.show())

+---------+--------------------+
|InvoiceID|                item|
+---------+--------------------+
|        1|[sausage, semi-fi...|
|        2|[pastry, salty sn...|
|        3|[canned beer, mis...|
|        4|[hygiene articles...|
|        5|[pickled vegetabl...|
|        6| [curd, frankfurter]|
|        7|[rolls/buns, saus...|
|        8|  [soda, whole milk]|
|        9| [beef, white bread]|
|       10|[frankfurter, sod...|
|       11|[frozen vegetable...|
|       12|[butter, whole milk]|
|       13|[sugar, tropical ...|
|       14|[butter milk, spe...|
|       15|[rolls/buns, saus...|
|       16|[detergent, root ...|
|       17|[dental care, fro...|
|       18|        [rolls/buns]|
|       19|[cling film/bags,...|
|       20|[canned beer, fro...|
+---------+--------------------+
only showing top 20 rows
None


#### A-priori Algorithm
The a-priori alghorithm allow us to find frequent itemsets expliting the monotonicity principle. In this section we are goint to explore the scalability of the algorothm inplementign three verison of them:
- the original version with pruning
- the version with hash table and Storing Pair Counts differently, that would allow us to consume less memory
- the scalable version with spark

TO perform the apriori algorithm we have to be sure that data:



To perform this evaluation we can use a personalized function matrix that would give us the time of the process

In [171]:
import time
from functools import wraps

def timer(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"[TIMER] {func.__name__} executed in {end_time - start_time:.4f} seconds")
        return result
    return wrapper


##### Vanilla A-piori algorithm

In [172]:
from collections import defaultdict
from itertools import combinations
import random

baskets_list = [
    set(row.item) if hasattr(row, 'item') else set(row)
    for row in transactions.select("item").collect()
]

print(f"Numero di basket nel campione: {len(baskets_list)}")
num_transactions = len(baskets_list)
print(f"Numero di transazioni nel campione: {num_transactions}")


Numero di basket nel campione: 14963
Numero di transazioni nel campione: 14963


In [173]:
# Step 1 — support for sigleton (L1)
@timer
def get_L1(baskets, min_support):
    item_counts = defaultdict(int)

    for basket in baskets:
        for item in basket:
            item_counts[frozenset([item])] += 1

    L1 = {
        itemset: count
        for itemset, count in item_counts.items()
        if count >= min_support
    }

    return L1



In [174]:

def all_subsets_frequent(itemset, subset_size, frequent_itemsets):
    """Verifica che tutti i sottoinsiemi di dimensione subset_size siano frequenti"""
    from itertools import combinations

    for subset in combinations(itemset, subset_size):
        if frozenset(subset) not in frequent_itemsets:
            return False
    return True

In [175]:
@timer
def generate_candidates(prev_L, k):
    candidates = set()
    prev_itemsets = list(prev_L.keys())

    for i in range(len(prev_itemsets)):
        for j in range(i + 1, len(prev_itemsets)):
            union = prev_itemsets[i] | prev_itemsets[j]

            if len(union) == k:
                # PRUNING: verify il all the subsets of dimension k-1 are frequent dimensione k-1 siano frequenti
                if all_subsets_frequent(union, k-1, prev_L):
                    candidates.add(union)

    return candidates


    return candidates



In [176]:
@timer
def count_candidates(baskets, candidates):
    counts = defaultdict(int)

    for basket in baskets:
        for candidate in candidates:
            if candidate.issubset(basket):
                counts[candidate] += 1

    return counts


In [177]:
@timer
def apriori(baskets, min_support):


    min_support_absolute = int(min_support * len(baskets))

    # Create singletons
    L1 = get_L1(baskets, min_support_absolute)
    L = dict(L1)

    #Start to create k-itemsets
    k = 2
    current_L = L1

    while current_L:
        Ck = generate_candidates(current_L, k)

        # If there arent any candidates, we can stop the algorithm
        if not Ck:
            break

        candidate_counts = count_candidates(baskets, Ck)

        current_L = {
            itemset: count
            for itemset, count in candidate_counts.items()
            if count >= min_support_absolute
        }

        L.update(current_L)
        k += 1

    return L

In [178]:
print(apriori(baskets_list, min_support = 0.002))

[TIMER] get_L1 executed in 0.0062 seconds
[TIMER] generate_candidates executed in 0.0074 seconds
[TIMER] count_candidates executed in 5.3904 seconds
[TIMER] generate_candidates executed in 0.0056 seconds
[TIMER] count_candidates executed in 0.2200 seconds
[TIMER] apriori executed in 5.6303 seconds
{frozenset({'whole milk'}): 2363, frozenset({'yogurt'}): 1285, frozenset({'sausage'}): 903, frozenset({'semi-finished bread'}): 142, frozenset({'pastry'}): 774, frozenset({'salty snack'}): 281, frozenset({'misc. beverages'}): 236, frozenset({'canned beer'}): 702, frozenset({'hygiene articles'}): 205, frozenset({'soda'}): 1453, frozenset({'pickled vegetables'}): 134, frozenset({'curd'}): 504, frozenset({'frankfurter'}): 565, frozenset({'rolls/buns'}): 1646, frozenset({'beef'}): 508, frozenset({'white bread'}): 359, frozenset({'whipped/sour cream'}): 654, frozenset({'other vegetables'}): 1827, frozenset({'frozen vegetables'}): 419, frozenset({'butter'}): 527, frozenset({'sugar'}): 265, frozense

From now the a-priori algorithm gives us back a python dictionary that for each itemsets it has the number of time it appears

##### Optimized A-priori algorithm
Following the book, the previus algortithm could be optimized in two ways:
- encoding the items in a hash table to save memory
- storing the pair counts differently. In this case we can store it has:
    - triangular matrix
    - triples method

i have implemented two way to save time and space.

**AGGIUNGERE IL PRUNING**

In [179]:
from collections import defaultdict
import numpy as np
from functools import wraps

# Encoding in an hash table
def encode_baskets(baskets):
    #Convert items (string) to ids (int)
    item_to_id = {}
    id_to_item = {}
    item_id = 0

    for basket in baskets:
        for item in basket:
            if item not in item_to_id:
                item_to_id[item] = item_id
                id_to_item[item_id] = item
                item_id += 1

    baskets_encoded = [
        frozenset(item_to_id[item] for item in basket)
        for basket in baskets
    ]

    return item_to_id, id_to_item, baskets_encoded


def decode_itemsets(L_encoded, id_to_item):
    #Convert ids (int) to items (string)
    L_decoded = {}
    for itemset_ids, count in L_encoded.items():
        itemset_names = frozenset(id_to_item[item_id] for item_id in itemset_ids)
        L_decoded[itemset_names] = count
    return L_decoded


# Step 1 count sinmgleton
@timer
def get_L1(baskets, min_support):
    item_counts = defaultdict(int)

    for basket in baskets:
        for item in basket:
            item_counts[frozenset([item])] += 1

    L1 = {
        itemset: count
        for itemset, count in item_counts.items()
        if count >= min_support
    }

    return L1


# ============================================================================
# STEP 2: COUNT PAIRS - TRIANGULAR MATRIX
# ============================================================================

def create_triangular_matrix(n_items):
    """Crea array per coppie"""
    size = n_items * (n_items - 1) // 2
    return np.zeros(size, dtype=np.int32)


def pair_to_index(i, j, n):
    """Converte coppia (i,j) in indice k"""
    if i >= j:
        i, j = j, i
    return (i - 1) * n - i * (i + 1) // 2 + j - i


@timer
def count_pairs_triangular(baskets, L1, min_support, n_items):
    """Conta coppie usando Triangular Matrix"""

    # Estrai item frequenti
    frequent_items = set()
    for itemset in L1.keys():
        frequent_items.update(itemset)

    # Crea matrice triangolare
    pair_counts = create_triangular_matrix(n_items)

    # Conta le coppie
    for basket in baskets:
        frequent_in_basket = [item for item in basket if item in frequent_items]

        for i in range(len(frequent_in_basket)):
            for j in range(i + 1, len(frequent_in_basket)):
                item1 = frequent_in_basket[i]
                item2 = frequent_in_basket[j]

                if item1 > item2:
                    item1, item2 = item2, item1

                k = pair_to_index(item1, item2, n_items)
                pair_counts[k] += 1

    # Estrai coppie frequenti
    L2 = {}
    frequent_list = sorted(frequent_items)

    for i in range(len(frequent_list)):
        for j in range(i + 1, len(frequent_list)):
            item1, item2 = frequent_list[i], frequent_list[j]
            k = pair_to_index(item1, item2, n_items)
            count = int(pair_counts[k])

            if count >= min_support:
                L2[frozenset([item1, item2])] = count

    return L2


# ============================================================================
# STEP 2: COUNT PAIRS - TRIPLES METHOD
# ============================================================================

@timer
def count_pairs_triples(baskets, L1, min_support):
    """Conta coppie usando Triples Method (dizionario)"""

    # Estrai item frequenti
    frequent_items = set()
    for itemset in L1.keys():
        frequent_items.update(itemset)

    # Conta le coppie
    pair_counts = {}

    for basket in baskets:
        frequent_in_basket = [item for item in basket if item in frequent_items]

        for i in range(len(frequent_in_basket)):
            for j in range(i + 1, len(frequent_in_basket)):
                item1 = frequent_in_basket[i]
                item2 = frequent_in_basket[j]

                pair = (item1, item2) if item1 < item2 else (item2, item1)
                pair_counts[pair] = pair_counts.get(pair, 0) + 1

    # Estrai coppie frequenti
    L2 = {
        frozenset(pair): count
        for pair, count in pair_counts.items()
        if count >= min_support
    }

    return L2


# ============================================================================
# STEP 3+: LARGER ITEMSETS
# ============================================================================

@timer
def generate_candidates(prev_L, k):
    candidates = set()
    prev_itemsets = list(prev_L.keys())

    for i in range(len(prev_itemsets)):
        for j in range(i + 1, len(prev_itemsets)):
            union = prev_itemsets[i] | prev_itemsets[j]
            if len(union) == k:
                candidates.add(union)

    return candidates


@timer
def count_candidates(baskets, candidates):
    counts = defaultdict(int)

    for basket in baskets:
        for candidate in candidates:
            if candidate.issubset(basket):
                counts[candidate] += 1

    return counts



@timer
def apriori_optimized(baskets, method, min_support=0.01):
    """
    Algoritmo Apriori

    Args:
        baskets: lista di set/frozenset
        min_support: supporto minimo relativo (es. 0.01 = 1%)
        method: 'triples' o 'triangular'
    """

    # Encoding
    item_to_id, id_to_item, baskets_encoded = encode_baskets(baskets)
    n_items = len(item_to_id)
    min_support_count = int(min_support * len(baskets))

    print(f"Items: {n_items}, Baskets: {len(baskets)}, Min support: {min_support_count}")

    # Step 1: Singletons
    L1 = get_L1(baskets_encoded, min_support_count)
    print(f"L1: {len(L1)} itemset")

    # Step 2: Pairs
    if method == 'triangular':
        L2 = count_pairs_triangular(baskets_encoded, L1, min_support_count, n_items)
    elif method == 'triples':
        L2 = count_pairs_triples(baskets_encoded, L1, min_support_count)
    else:
        raise ValueError("Plese instert a method 'triangular' or 'triples'")

    print(f"L2: {len(L2)} itemset")

    # Step 3+: Itemsets più grandi
    L = {**L1, **L2}
    k = 3
    current_L = L2

    while current_L:
        Ck = generate_candidates(current_L, k)

        if not Ck:
            break

        candidate_counts = count_candidates(baskets_encoded, Ck)

        current_L = {
            itemset: count
            for itemset, count in candidate_counts.items()
            if count >= min_support_count
        }

        if current_L:
            print(f"L{k}: {len(current_L)} itemset")

        L.update(current_L)
        k += 1

    # Decode
    L_decoded = decode_itemsets(L, id_to_item)

    print(f"Totale: {len(L_decoded)} itemset frequenti")

    return L_decoded


# Test con Triples Method
print("Apriori with triples method")
L_triples = apriori_optimized(baskets_list, min_support=0.002, method='triples')
print(L_triples)

print("Apriori with triangular method")
L_triangular = apriori_optimized(baskets_list, min_support=0.002, method='triangular')
print(L_triangular)



Apriori with triples method
Items: 167, Baskets: 14963, Min support: 29
[TIMER] get_L1 executed in 0.0053 seconds
L1: 130 itemset
[TIMER] count_pairs_triples executed in 0.0118 seconds
L2: 217 itemset
[TIMER] generate_candidates executed in 0.0030 seconds
[TIMER] count_candidates executed in 1.2861 seconds
Totale: 347 itemset frequenti
[TIMER] apriori_optimized executed in 1.3698 seconds
{frozenset({'whole milk'}): 2363, frozenset({'yogurt'}): 1285, frozenset({'sausage'}): 903, frozenset({'semi-finished bread'}): 142, frozenset({'pastry'}): 774, frozenset({'salty snack'}): 281, frozenset({'misc. beverages'}): 236, frozenset({'canned beer'}): 702, frozenset({'hygiene articles'}): 205, frozenset({'soda'}): 1453, frozenset({'pickled vegetables'}): 134, frozenset({'curd'}): 504, frozenset({'frankfurter'}): 565, frozenset({'rolls/buns'}): 1646, frozenset({'beef'}): 508, frozenset({'white bread'}): 359, frozenset({'whipped/sour cream'}): 654, frozenset({'other vegetables'}): 1827, frozenset(

##### Spark version
1. Convert the input baskets (DataFrame) to an RDD of `frozenset`s.
2. Compute singletons (L1):
   * Map each item in baskets to `(item, 1)`
   * Reduce by key to count occurrences `(item, count)`
   * Filter by `min_support` and cache results
3. For each L_k (k ≥ 2):
   * Collect frequent itemsets from L_{k-1} and broadcast to workers
   * Generate candidate itemsets from baskets, pruning any whose subsets are not frequent
   * Count candidates, filter by `min_support`, and cache results
4. Collect all frequent itemsets and return as a DataFrame with `itemset` and `support`.


In [180]:
from pyspark.sql import DataFrame
from itertools import combinations

@timer
def apriori_rdd(transactions_df, min_support) -> DataFrame:

    # -----------------------------
    # STEP 0 — Preparazione RDD
    # -----------------------------
    transactions_rdd = (
        transactions_df
        .select("item")
        .rdd
        .map(lambda row: frozenset(row.item))
        .cache()
    )

    num_transactions = transactions_rdd.count()
    min_support_count = int(min_support * num_transactions)
    all_frequent_itemsets = []

    # STEP 1 — L1 (singletons)
    Lk = (
        transactions_rdd
        .flatMap(lambda basket: [(frozenset([item]), 1) for item in basket])
        .reduceByKey(lambda a, b: a + b)
        .filter(lambda x: x[1] >= min_support_count)
        .cache()
    )
    all_frequent_itemsets.append(Lk)
    k = 2


    # STEP GENERALE — k >= 2
    while True:
        Lk_itemsets = set(Lk.map(lambda x: x[0]).collect())

        if not Lk_itemsets:
            break

        broadcast_Lk = sc.broadcast(Lk_itemsets)

        def generate_candidates(basket):
            candidates = []
            for combo in combinations(sorted(basket), k):
                combo = frozenset(combo)
                # pruning: tutti i sottoinsiemi k-1 devono essere frequenti
                if all(
                    frozenset(sorted(combo)[:i] + sorted(combo)[i+1:]) in broadcast_Lk.value
                    for i in range(len(combo))
                ):
                    candidates.append((combo, 1))
            return candidates

        Lk = (
            transactions_rdd
            .flatMap(generate_candidates)
            .reduceByKey(lambda a, b: a + b)
            .filter(lambda x: x[1] >= min_support_count)
            .cache()
        )

        if Lk.isEmpty():
            break

        all_frequent_itemsets.append(Lk)
        k += 1

    # -----------------------------
    # Output
    # -----------------------------
    result_rdd = (
        sc.union(all_frequent_itemsets)
          .map(lambda x: (sorted(list(x[0])), x[1]))
    )
    return result_rdd.toDF(["itemset", "support"])

In [181]:
min_support = 0.002  # 1% delle transazioni
frequent_itemsets_df = apriori_rdd(transactions, min_support)
frequent_itemsets_df.orderBy("support", ascending=True).show(truncate=False)

[TIMER] apriori_rdd executed in 1.3382 seconds
+--------------------------------+-------+
|itemset                         |support|
+--------------------------------+-------+
|[frozen meals, whole milk]      |29     |
|[ice cream, whole milk]         |29     |
|[fish]                          |29     |
|[artif. sweetener]              |29     |
|[specialty fat]                 |29     |
|[light bulbs]                   |29     |
|[grapes, whole milk]            |29     |
|[salty snack, whole milk]       |29     |
|[butter, sausage]               |29     |
|[rolls/buns, salty snack]       |29     |
|[margarine, root vegetables]    |29     |
|[sausage, shopping bags]        |29     |
|[oil, whole milk]               |29     |
|[bottled water, curd]           |29     |
|[butter, citrus fruit]          |29     |
|[beverages, whole milk]         |29     |
|[hamburger meat, rolls/buns]    |29     |
|[domestic eggs, root vegetables]|30     |
|[kitchen towels]                |30     |
|[desse

From now we have three possible implementa

In [186]:
# --- Apriori standard (Python) ---
print("=== Apriori standard ===")
L_standard = apriori(baskets_list, min_support=0.002)
print(f"Numero di itemset trovati: {len(L_standard)}")

# --- Apriori metodo 'triples' (Python) ---
print("=== Apriori metodo 'triples' ===")
L_triples = apriori_optimized(baskets_list, min_support=0.002, method='triples')
print(f"Numero di itemset trovati: {len(L_triples)}")

# --- Apriori metodo 'triangular' (Python) ---
print("=== Apriori metodo 'triangular' ===")
L_triangular = apriori_optimized(baskets_list, min_support=0.002, method='triangular')
print(f"Numero di itemset trovati: {len(L_triangular)}")


# Apriori su RDD (PySpark)
print("=== Apriori su RDD (PySpark) ===")
L_rdd_df = apriori_rdd(transactions, min_support=0.002)

# Numero di itemset
num_itemsets = L_rdd_df.count()
print(f"Numero di itemset trovati: {num_itemsets}")


=== Apriori standard ===
[TIMER] get_L1 executed in 0.0102 seconds
[TIMER] generate_candidates executed in 0.0026 seconds
[TIMER] count_candidates executed in 5.4870 seconds
[TIMER] generate_candidates executed in 0.0034 seconds
[TIMER] count_candidates executed in 1.4163 seconds
[TIMER] apriori executed in 6.9204 seconds
Numero di itemset trovati: 347
=== Apriori metodo 'triples' ===
Items: 167, Baskets: 14963, Min support: 29
[TIMER] get_L1 executed in 0.0055 seconds
L1: 130 itemset
[TIMER] count_pairs_triples executed in 0.0122 seconds
L2: 217 itemset
[TIMER] generate_candidates executed in 0.0029 seconds
[TIMER] count_candidates executed in 1.3234 seconds
Totale: 347 itemset frequenti
[TIMER] apriori_optimized executed in 1.3532 seconds
Numero di itemset trovati: 347
=== Apriori metodo 'triangular' ===
Items: 167, Baskets: 14963, Min support: 29
[TIMER] get_L1 executed in 0.0058 seconds
L1: 130 itemset
[TIMER] count_pairs_triangular executed in 0.0190 seconds
L2: 217 itemset
[TIMER

The main bottleneck of the Apriori algorithm lies in the iterative collection of frequent itemsets between iterations. In the current implementation, all frequent itemsets computed across workers are gathered to the driver at the end of each iteration and then broadcast back to the workers for the next one. This creates a scalability issue: the driver becomes both a memory and communication bottleneck, and the overhead grows as the number of frequent itemsets increases.
This limitation is inherent to the algorithm itself. In order to apply the monotonicity principle — which states that all subsets of a frequent itemset must also be frequent — each worker needs a global view of the frequent itemsets from the previous iteration to perform candidate generation and pruning correctly. This global synchronization step is unavoidable in Apriori: replacing collect and broadcast with fully distributed joins is possible, but introduces multiple shuffle stages per iteration, which in practice leads to higher latency and memory pressure.
For this reason, a more scalable alternative is FP-Growth. Unlike Apriori, FP-Growth does not explicitly generate candidate itemsets. Instead, it compresses the transaction database into a compact data structure called an FP-tree (Frequent Pattern Tree) and mines frequent patterns directly from this structure. By eliminating repeated candidate generation and reducing the number of full database scans to typically two passes instead of one per iteration as in Apriori, FP-Growth significantly reduces both communication and computation overhead in distributed environments, making it the preferred choice in large-scale settings such as Spark MLlib.

##### FPGrowth


In [None]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(itemsCol="item", minSupport=0.001, minConfidence=0.001)
model = fpGrowth.fit(baskets)

# Frequent itemsets
model.freqItemsets.show(truncate=False)

# Association rules
model.associationRules.collect()

# Predictions
model.transform(baskets).show(truncate=False)



In [None]:
from pyspark.ml.fpm import FPGrowth

# Definiamo una griglia di valori per minSupport e minConfidence
support_values = [0.01, 0.005, 0.001, 0.0005, 0.0001]
confidence_values = [0.5, 0.3, 0.1, 0.05, 0.01, 0.001]

best_model = None
best_support = None
best_confidence = None

# Ciclo per trovare la prima combinazione che genera regole
for s in support_values:
    for c in confidence_values:
        print(f"Testing minSupport={s}, minConfidence={c}...")
        fpGrowth = FPGrowth(itemsCol="item", minSupport=s, minConfidence=c)
        model = fpGrowth.fit(items_df)
        rules = model.associationRules
        if rules.count() > 0:
            best_model = model
            best_support = s
            best_confidence = c
            print(f"Found rules with minSupport={s}, minConfidence={c}, rules={rules.count()}")
            break
    if best_model is not None:
        break

if best_model:
    print(f"\nBest parameters: minSupport={best_support}, minConfidence={best_confidence}")
    print("Frequent itemsets:")
    best_model.freqItemsets.show(truncate=False)
    print("Association rules:")
    best_model.associationRules.collect()
    print("Predictions:")
    best_model.transform(items_df).show(truncate=False)
else:
    print("No association rules found with the tested parameters.")


In [None]:
spark.stop()

# NUOVO DATASET
Association rule su un dataset funzionante

In [None]:
from pyspark.sql.types import StructType, StructField, StringType

transactions_schema = StructType([
    StructField("Member_number", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("itemDescription", StringType(), True)
])

groceries_dataset = spark.read.format("csv")\
    .option("header", "true")\
    .schema(transactions_schema)\
    .load("data/raw_data/groceries_dataset.csv")

items_df = groceries_dataset.groupBy("Member_number") \
    .agg(array_distinct(collect_list("itemDescription")).alias("items"))

items_df.head()



from pyspark.ml.fpm import FPGrowth

# Definiamo una griglia di valori per minSupport e minConfidence
support_values = [0.01, 0.005, 0.001, 0.0005, 0.0001]
confidence_values = [0.5, 0.3, 0.1, 0.05, 0.01, 0.001]

best_model = None
best_support = None
best_confidence = None

# Ciclo per trovare la prima combinazione che genera regole
for s in support_values:
    for c in confidence_values:
        print(f"Testing minSupport={s}, minConfidence={c}...")
        fpGrowth = FPGrowth(itemsCol="items", minSupport=s, minConfidence=c)
        model = fpGrowth.fit(items_df)
        rules = model.associationRules
        if rules.count() > 0:
            best_model = model
            best_support = s
            best_confidence = c
            print(f"Found rules with minSupport={s}, minConfidence={c}, rules={rules.count()}")
            break
    if best_model is not None:
        break

if best_model:
    print(f"\nBest parameters: minSupport={best_support}, minConfidence={best_confidence}")
    print("Frequent itemsets:")
    best_model.freqItemsets.show(truncate=False)
    print("Association rules:")
    best_model.associationRules.show(truncate=False)
    print("Predictions:")
    best_model.transform(items_df).show(truncate=False)
else:
    print("No association rules found with the tested parameters.")



In [None]:
spark.stop()

In [None]:
from pyspark.sql.functions import collect_list, array_distinct


online_retail = spark.read.format("csv")\
    .option("header", "true")\
    .load("data/raw_data/online_retail.csv")

online_retail.head()


items_df = online_retail.groupBy("CustomerID").agg(array_distinct(collect_list("Description")).alias("items"))

items_df.head()



from pyspark.ml.fpm import FPGrowth

# Definiamo una griglia di valori per minSupport e minConfidence
support_values = [0.01, 0.005, 0.001, 0.0005, 0.0001]
confidence_values = [0.5, 0.3, 0.1, 0.05, 0.01, 0.001]

best_model = None
best_support = None
best_confidence = None

# Ciclo per trovare la prima combinazione che genera regole
for s in support_values:
    for c in confidence_values:
        print(f"Testing minSupport={s}, minConfidence={c}...")
        fpGrowth = FPGrowth(itemsCol="items", minSupport=s, minConfidence=c)
        model = fpGrowth.fit(items_df)
        rules = model.associationRules
        if rules.count() > 0:
            best_model = model
            best_support = s
            best_confidence = c
            print(f"Found rules with minSupport={s}, minConfidence={c}, rules={rules.count()}")
            break
    if best_model is not None:
        break

if best_model:
    print(f"\nBest parameters: minSupport={best_support}, minConfidence={best_confidence}")
    print("Frequent itemsets:")
    best_model.freqItemsets.show(truncate=False)
    print("Association rules:")
    best_model.associationRules.show(truncate=False)
    print("Predictions:")
    best_model.transform(items_df).show(truncate=False)
else:
    print("No association rules found with the tested parameters.")


In [None]:
import pandas as pd
onl = pd.read_csv("data/raw_data/online_retail.csv")
onl


In [None]:
from pyspark.sql import DataFrame
from itertools import combinations

def apriori_pure_rdd(transactions_df, min_support) -> DataFrame:

    # -----------------------------
    # STEP 0 — Preparazione
    # -----------------------------
    transactions_rdd = (
        transactions_df
        .select("item")
        .rdd
        .map(lambda row: frozenset(row.item))
        .cache()
    )

    num_transactions = transactions_rdd.count()  # unico collect obbligatorio
    min_support_count = int(min_support * num_transactions)
    all_frequent_itemsets = []

    # -----------------------------
    # STEP 1 — L1
    # -----------------------------
    Lk = (
        transactions_rdd
        .flatMap(lambda basket: [(frozenset([item]), 1) for item in basket])
        .reduceByKey(lambda a, b: a + b)
        .filter(lambda x: x[1] >= min_support_count)
        .cache()
    )
    all_frequent_itemsets.append(Lk)
    k = 2

    # -----------------------------
    # STEP GENERALE — k >= 2
    # -----------------------------
    while not Lk.isEmpty():

        # ---- Candidate Generation con self-join distribuito ----
        # Mappa ogni itemset come (prefisso, ultimo_elemento)
        # due itemset con stesso prefisso vengono uniti -> candidato size k
        prefix_rdd = (
            Lk
            .map(lambda x: x[0])
            .map(lambda itemset: (
                tuple(sorted(itemset)[:-1]),   # prefisso = primi k-2 elementi
                tuple(sorted(itemset))[-1]     # ultimo elemento
            ))
        )

        # self-join su prefisso -> coppie (last_a, last_b)
        candidates_rdd = (
            prefix_rdd
            .groupByKey()
            .flatMap(lambda x: [
                (frozenset(x[0]) | frozenset([a, b]), 1)
                for i, a in enumerate(sorted(x[1]))
                for b in sorted(x[1])[i+1:]
            ])
        )
        # candidates_rdd: (frozenset candidato size k, 1)

        # ---- Pruning distribuito con join ----
        # Per ogni candidato genera tutti i sottoinsiemi size k-1
        # poi verifica che esistano in Lk con un join

        # RDD: (sottoinsieme, candidato_padre)
        subsets_rdd = (
            candidates_rdd
            .map(lambda x: x[0])                         # solo il candidato
            .flatMap(lambda candidate: [
                (frozenset(sorted(candidate)[:i] + sorted(candidate)[i+1:]), candidate)
                for i in range(len(candidate))            # ogni sottoinsieme k-1
            ])
        )
        # subsets_rdd: (sottoinsieme, candidato)

        # Lk come RDD di sole chiavi per il join
        Lk_keys = Lk.map(lambda x: (x[0], None))
        # Lk_keys: (frozenset, None)

        # join: solo i sottoinsiemi che esistono in Lk sopravvivono
        valid_subsets = (
            subsets_rdd
            .join(Lk_keys)                               # inner join distribuito
            .map(lambda x: x[1][0])                      # recupera il candidato padre
        )
        # valid_subsets: candidati i cui sottoinsiemi sono frequenti (con duplicati)

        # un candidato è valido solo se TUTTI i suoi k sottoinsiemi sono in Lk
        # => conta quante volte appare ogni candidato, deve apparire esattamente k volte
        pruned_candidates = (
            valid_subsets
            .map(lambda candidate: (candidate, 1))
            .reduceByKey(lambda a, b: a + b)
            .filter(lambda x: x[1] == k)                 # tutti k sottoinsiemi validi
            .map(lambda x: x[0])                         # solo il candidato
        )
        # pruned_candidates: RDD di frozenset candidati validi

        # ---- Conteggio distribuito ----
        # join tra transazioni esplose e candidati
        # espandi ogni basket in tutti i suoi sottoinsiemi di taglia k
        basket_subsets = (
            transactions_rdd
            .flatMap(lambda basket: [
                (frozenset(combo), 1)
                for combo in combinations(sorted(basket), k)
            ])
        )
        # basket_subsets: (sottoinsieme size k, 1)

        # join con i candidati validi -> conta solo i candidati che ci interessano
        Lk_keys_pruned = pruned_candidates.map(lambda c: (c, None))

        Lk = (
            basket_subsets
            .join(Lk_keys_pruned)                        # filtra solo i candidati validi
            .map(lambda x: (x[0], x[1][0]))              # (candidato, 1)
            .reduceByKey(lambda a, b: a + b)             # somma supporti
            .filter(lambda x: x[1] >= min_support_count)
            .cache()
        )

        if Lk.isEmpty():
            break

        all_frequent_itemsets.append(Lk)
        k += 1

    # -----------------------------
    # Output
    # -----------------------------
    result_rdd = (
        sc.union(all_frequent_itemsets)
          .map(lambda x: (sorted(list(x[0])), x[1]))
    )
    return result_rdd.toDF(["itemset", "support"])
