



---

# MARKET-BASKET ANALYSIS

## Massive Algorithm 
### Data Science for Economics

##### Angelica Longo, Melissa Rizzi

The goal of this project is to implement a system for **detecting frequent itemsets**, commonly known as **market-basket analysis**.
In this notebook, the detector treats each user’s reviewed books as a basket, with books serving as items.

The project is based on the **[Amazon Books Review](https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews)** dataset, published on Kaggle under the public domain CC0 license. Data is downloaded during the execution of the scripts via an API and contains variables related to users and their reviews of purchased books.

Given the large volume of data (3 million rows), a reasonable subsample is created using **PySpark**, consisting of approximately 500,000 rows, while ensuring scalability for the full dataset.

The project is structured as follows:

- **Preprocessing** – This phase includes data cleaning, checking data integrity, handling null values, removing duplicates, and computing the overall mean to verify consistency with the selected subsample.
- **Subsampling** – A subset of data is created while maintaining a representative distribution of user choices and ratings.
- **Frequent Itemset Mining** – The final step involves implementing an algorithm to identify frequent itemsets within the dataset.

This structured approach ensures both **efficiency** and **scalability** while maintaining **data integrity**.

### Table of Contents
- [1. Data Import](#1-Data-Import)
- [2. Data PreProcessing](#2-data-preprocessing)
  - [2.1 Data Integrity](#21-data-integrity)
  - [2.2 Missing Data](#22-missing-data)
  - [2.3 Data Duplicates](#22-data-duplicates)
  - [2.4 Rating Means](#22-rating-means)
- [3. Subsample Creation](#3-subsample-creation)
- [4. Frequent Itemset Mining](#4-frequent-itemset-mining)


---
### 1. Data Import

In [1]:
#import os
#import zipfile

In [2]:
#os.environ['KAGGLE_USERNAME'] = "melissarizzi"
#os.environ['KAGGLE_KEY'] = "3ed913e7329a3117a254e67179c0f8bb"

In [3]:
#!pip install kaggle

In [4]:
#!kaggle datasets download -d mohamedbakhet/amazon-books-reviews

In [5]:
#with zipfile.ZipFile("amazon-books-reviews.zip", 'r') as zip_ref:
#    zip_ref.extractall("amazon_books_data")

---
### 2. Data PreProcessing

In [6]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, max, sum, when, collect_set, count
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as F
from pyspark.ml.fpm import FPGrowth

In [7]:
import os
os.environ["PYSPARK_PYTHON"] = "python"

In [8]:
# Create Spark Session
spark = SparkSession.builder.appName("MapReduce").getOrCreate()

In [9]:
# Import data
data = spark.read.csv("amazon_books_data/Books_rating.csv", header=True, inferSchema=True)
#data.show(5)

In [10]:
#Select only useful columns
df = data.select("Id", 'Title', "User_id", "review/score",'review/text').withColumnRenamed("review/score", "score")
#df.show(5)

#### 2.1 Data Integrity

In [11]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- User_id: string (nullable = true)
 |-- score: string (nullable = true)
 |-- review/text: string (nullable = true)



In [12]:
# Transform 'score' variable in double type
df = df.withColumn("score", col("score").cast(DoubleType()))
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- User_id: string (nullable = true)
 |-- score: double (nullable = true)
 |-- review/text: string (nullable = true)



In [13]:
# Check score range
df.select(min(col("score")).alias("min_score"), max(col("score")).alias("max_score")).show()

+---------+----------+
|min_score| max_score|
+---------+----------+
|      1.0|1.295568E9|
+---------+----------+



In [14]:
# Keep just data with the 'score' values in the correct range [1, 5]
df = df.filter((col("score") >= 1) & (col("score") <= 5))
df.select(min("score").alias("min_score"), max("score").alias("max_score")).show()

+---------+---------+
|min_score|max_score|
+---------+---------+
|      1.0|      5.0|
+---------+---------+



#### 2.2 Missing Data

In [15]:
# Count null values for each variable
null_counts = df.select(
    [sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]
)

null_counts.show()

+---+-----+-------+-----+-----------+
| Id|Title|User_id|score|review/text|
+---+-----+-------+-----+-----------+
|  0|  196| 561492|    0|          9|
+---+-----+-------+-----+-----------+



What stands out right away, especially for the purpose of our analysis, is that there are many missing values in the User_id variable. One possible reason for this could be that users who leave reviews but are not registered don’t have a user ID. Our goal is to identify baskets of items purchased by the same users, but without the user ID, this analysis cannot be conducted. We explored the possibility of using profile names instead, by assigning a dummy ID to users with the same name. However, we were aware that this might not provide accurate results due to potential name duplication. Moreover, there were more missing profile names than missing user IDs, which made this solution unfeasible. After considering our options, we ultimately decided to **drop the missing values**, as we couldn’t identify a suitable method to replace them.

In [16]:
# Remove null values
df_clean = df.dropna()
#df_clean.show(5)

In [17]:
# Check data size
n_rows = df.count()
n_rows_clean = df_clean.count()
print(f"Number of Rows - Before cleaning: {n_rows}")
print(f"Number of Rows - After cleaning: {n_rows_clean}")

Number of Rows - Before cleaning: 2981912
Number of Rows - After cleaning: 2420235


#### 2.3 Data Duplicates

In [18]:
# Remove duplicated rows
df_clean = df_clean.dropDuplicates()

n_rows_clean = df_clean.count()
print(f"Number of Rows - After duplicates removal: {n_rows_clean}")

Number of Rows - After duplicates removal: 2398220


Up until now, we’ve performed a general cleaning of the dataset. From here on, we’ll focus exclusively on the three columns that are relevant to our analysis (Id, User_id, and score), forming a new dataset: df_short.

In [19]:
# Remove useless columns
df_short = df_clean.select("Id", "User_id","score")
#df_short.show(5)

In [20]:
# Check and remove duplicated rows for the three considered variables
df_short= df_short.dropDuplicates()

Given that the same user could have rated the same book twice, we want to compute the mean of the different scores given by the same user to the same book.

In [21]:
# Find duplicates considering only 'Id' and 'User_id'
duplicati = df_short.groupBy("Id", "User_id").count().filter("count > 1")

In [22]:
# Compute average score for every (Id, User_id)
score_mean = df_short.groupBy('Id', 'User_id').agg(F.mean('score').alias('mean_score'))
df_final = df_short.join(score_mean, on=['Id', 'User_id'], how='left')

# Creation of the final preprocessed dataset
df_final = df_final.select('Id','User_id', 'mean_score')
df_final = df_final.dropDuplicates()
#df_final.show(5)

In [23]:
n_rows_final = df_final.count()
print(f"Number of Rows - Final dataset: {n_rows_final}")

Number of Rows - Final dataset: 2380153


#### 2.4 Rating Means

We want to calculate the overall average score to see if consistency is maintained after creating the subsample.

- Overall mean score:

In [24]:
df_final = df_final.withColumn("mean_score", F.col("mean_score").cast("double"))

overall_mean = df_final.agg(F.avg("mean_score")).collect()[0][0]
print(f"Overall mean score - Final dataset: {overall_mean}")

Overall mean score - Final dataset: 4.22386130919595


- Mean score for each item:

In [25]:
#score_per_id = df_final.groupBy("Id").agg(F.avg("mean_score").alias("avg_score_pre"))
#score_per_id.show(5)

In [26]:
# Check data integrity
#score_per_id_above_5 = score_per_id.filter(F.col("avg_score_pre") > 5)
#score_per_id_above_5.show(5)

---
### 3. Subsample Creation

We aim to create a subsample that remains consistent with the original dataset. To achieve this, we select a fraction of users while ensuring that all their reviews are included. This approach allows us to better represent their purchasing behavior and rating patterns, preserving the integrity of the data.

In [27]:
num_users = df_final.select("User_id").distinct().count()
print(f"Total number of different users - Original dataset: {num_users}")

Total number of different users - Original dataset: 1004214


In [28]:
# Keep just 20% of the users
sample_fraction = 0.2
user_sample = df_final.select("User_id").distinct().sample(fraction=sample_fraction, seed=42)

In [29]:
# Create the subsample with the selected users
df_sampled = df_final.join(user_sample, on="User_id", how="inner")
#df_sampled.show(5)

In [30]:
# Check subsample size
n_rows_sample = df_sampled.count()
print(f"Number of Rows - Sample: {n_rows_sample}")

Number of Rows - Sample: 471573


In [31]:
# Check data integrity
df_sampled.printSchema()

root
 |-- User_id: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- mean_score: double (nullable = true)



In [32]:
# Check mean coherence with the original dataset
overall_mean_sample = df_sampled.agg(F.avg("mean_score")).collect()[0][0]
print(f"Overall mean - Sample: {overall_mean_sample}")

Overall mean - Sample: 4.221354205322752


The overall mean of the subsample is coherent with the overall mean of the original final dataset.

---
### 4. Algorithm Implementation

#### 4.1 FP-Growth Algorithm

We considered only books that received a score above 3

In [33]:
# Filter and keep just rows with rating >= 3
df_filtered = df_sampled.filter(col("mean_score") >= 3)

# Filter and keep just users who rated > 1 book
user_counts = df_filtered.groupBy("User_id").agg(count("Id").alias("book_count"))
users_with_multiple_books = user_counts.filter(col("book_count") > 1).select("User_id")

df_filtered = df_filtered.join(users_with_multiple_books, on="User_id", how="inner")

In [34]:
df_filtered.show(5)

+--------------------+----------+----------+
|             User_id|        Id|mean_score|
+--------------------+----------+----------+
|A0236983QUCQMORABO03|1587888408|       5.0|
|A0236983QUCQMORABO03|1587888432|       5.0|
|A0236983QUCQMORABO03|1593352077|       5.0|
|A025268923L497N34...|B000P0UDX4|       5.0|
|A025268923L497N34...|B00005UVH9|       5.0|
+--------------------+----------+----------+
only showing top 5 rows



- Creation od Baskets of items 

In [35]:
# Create baskets of items for every user
df_basket = df_filtered.groupBy("User_id").agg(collect_set("Id").alias("items"))

In [36]:
df_basket.show(5)

+--------------------+--------------------+
|             User_id|               items|
+--------------------+--------------------+
|A0236983QUCQMORABO03|[1587888432, 1587...|
|A025268923L497N34...|[B000P0UDX4, B000...|
|A07084061WTSSXN6V...|[0808510002, B000...|
|      A100Q4BGPV187I|[0743236017, 0743...|
|      A100TQ7ZRE0W02|[0971237034, 0976...|
+--------------------+--------------------+
only showing top 5 rows



- Algorithm application

In [37]:
# Apply FP-Growth
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.01, minConfidence=0.2)
model = fpGrowth.fit(df_basket)

                        # Support: probabilità di acquisto di tutto il basket
                        # Confidence: probabilità che se compro un basket compro anche l'altro libro


In [38]:
print("Frequent Itemsets:")
model.freqItemsets.show(truncate=False)

# Count number of Frequent Itemsets
num_freq_itemsets = model.freqItemsets.count()
print(f"Number of rows of Frequent Itemsets: {num_freq_itemsets}")

Frequent Itemsets:
+------------------------------------------------+----+
|items                                           |freq|
+------------------------------------------------+----+
|[B000ILIJE0]                                    |689 |
|[B000NWU3I4]                                    |687 |
|[B000NWU3I4, B000ILIJE0]                        |686 |
|[B000PC54NG]                                    |685 |
|[B000PC54NG, B000NWU3I4]                        |684 |
|[B000PC54NG, B000NWU3I4, B000ILIJE0]            |683 |
|[B000PC54NG, B000ILIJE0]                        |684 |
|[B000NWQXBA]                                    |683 |
|[B000NWQXBA, B000PC54NG]                        |683 |
|[B000NWQXBA, B000PC54NG, B000NWU3I4]            |682 |
|[B000NWQXBA, B000PC54NG, B000NWU3I4, B000ILIJE0]|682 |
|[B000NWQXBA, B000PC54NG, B000ILIJE0]            |683 |
|[B000NWQXBA, B000NWU3I4]                        |682 |
|[B000NWQXBA, B000NWU3I4, B000ILIJE0]            |682 |
|[B000NWQXBA, B000ILIJE0]    

- Alta confidenza e lift elevato: Se vedi una regola con alta confidenza (vicina a 1.0) e un valore di lift molto alto, significa che c'è una forte correlazione tra gli articoli dell'antecedente e quelli del conseguente. Queste sono regole particolarmente utili per le raccomandazioni di prodotto.

- Basso supporto, alta confidenza e lift alto: Anche se il supporto è basso (ad esempio, 1% delle transazioni), un lift elevato e una confidenza vicina a 1.0 indicano che la regola è molto significativa per un numero ridotto di transazioni.

In [39]:
print("Association Rules:")
model.associationRules.show(truncate=False)

# Count number of Association Rules
num_association_rules = model.associationRules.count()
print(f"Number of rows of Association Rules: {num_association_rules}")

Association Rules:
+------------------------------------------------------------------------+------------+------------------+-----------------+--------------------+
|antecedent                                                              |consequent  |confidence        |lift             |support             |
+------------------------------------------------------------------------+------------+------------------+-----------------+--------------------+
|[B000NDSX6C, B000GQG7D2, B000H9R1Q0, B000PC54NG, B000NWU3I4, B000ILIJE0]|[B000GQG5MA]|0.9640522875816994|78.92355222311484|0.010936051899907321|
|[B000NDSX6C, B000GQG7D2, B000H9R1Q0, B000PC54NG, B000NWU3I4, B000ILIJE0]|[B000Q032UY]|1.0               |79.57227138643069|0.01134383688600556 |
|[B000NDSX6C, B000GQG7D2, B000H9R1Q0, B000PC54NG, B000NWU3I4, B000ILIJE0]|[B000NWQXBA]|1.0               |78.98975109809663|0.01134383688600556 |
|[B000GQG7D2, B000Q032UY, B000PC54NG, B000NWU3I4, B000ILIJE0]            |[B000H9R1Q0]|1.0               

- Considering association rules with antecedent and precedent a single book

In [40]:
association_rules = model.associationRules

association_rules_single = association_rules.withColumn(
    "antecedent_single", 
    F.explode(association_rules.antecedent)
).withColumn(
    "consequent_single", 
    F.explode(association_rules.consequent)
)

association_rules_single.select("antecedent_single", "consequent_single", "confidence", "lift", "support").show(truncate=False)


+-----------------+-----------------+------------------+-----------------+--------------------+
|antecedent_single|consequent_single|confidence        |lift             |support             |
+-----------------+-----------------+------------------+-----------------+--------------------+
|B000NDSX6C       |B000GQG5MA       |0.9640522875816994|78.92355222311484|0.010936051899907321|
|B000GQG7D2       |B000GQG5MA       |0.9640522875816994|78.92355222311484|0.010936051899907321|
|B000H9R1Q0       |B000GQG5MA       |0.9640522875816994|78.92355222311484|0.010936051899907321|
|B000PC54NG       |B000GQG5MA       |0.9640522875816994|78.92355222311484|0.010936051899907321|
|B000NWU3I4       |B000GQG5MA       |0.9640522875816994|78.92355222311484|0.010936051899907321|
|B000ILIJE0       |B000GQG5MA       |0.9640522875816994|78.92355222311484|0.010936051899907321|
|B000NDSX6C       |B000Q032UY       |1.0               |79.57227138643069|0.01134383688600556 |
|B000GQG7D2       |B000Q032UY       |1.0

In [41]:
######################################

In [42]:
#A PRIORI ALGORITHMS 

In [51]:
# 1. Esplodiamo la colonna "items_list" (sostituendo "items_list" con il nome effettivo della tua colonna)
df_exploded = df_basket.select(F.explode("items").alias("item"))

# 2. Calcoliamo la frequenza di ciascun item (frequente itemset singolo)
item_counts = df_exploded.groupBy("item").count()

# 3. Filtriamo gli item con frequenza >= supporto minimo
total_transactions = df_filtered.count()  # Numero totale di transazioni
frequent_items = item_counts.filter(item_counts["count"] / total_transactions >= min_support)

# 4. Ora generiamo le combinazioni degli item frequenti (pairwise)
from itertools import combinations

# Creiamo una lista di item frequenti
frequent_items_list = frequent_items.select("item").rdd.flatMap(lambda x: x).collect()

# Calcoliamo le combinazioni di item
item_pairs = list(combinations(frequent_items_list, 2))

# 5. Calcoliamo la frequenza delle combinazioni
# Creiamo una colonna con tutte le combinazioni per ogni transazione
df_pairs = df_exploded.withColumn("pair", F.array(*[F.when(F.col("item") == item, F.lit(1)).otherwise(0) for item in frequent_items_list]))

# Ora possiamo contare le combinazioni e calcolare il supporto per ogni coppia
pair_counts = df_pairs.groupBy("pair").agg(F.sum("pair").alias("count"))

# 6. Filtriamo le coppie che non raggiungono il supporto minimo
frequent_pairs = pair_counts.filter(pair_counts["count"] / total_transactions >= min_support)

# Mostra i risultati
frequent_items.show()
frequent_pairs.show()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 322.0 failed 1 times, most recent failure: Lost task 1.0 in stage 322.0 (TID 1216) (192.168.1.51 executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:789)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:789)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	... 32 more


In [None]:
# Generare item singoli e calcolare il supporto
item_counts = transactions.flatMap(lambda t: [(item, 1) for item in t]) \
                          .reduceByKey(lambda x, y: x + y)

# Filtrare per supporto minimo
frequent_items = item_counts.filter(lambda x: x[1] / total_transactions >= min_support) \
                            .map(lambda x: x[0]) \
                            .collect()

# Creare itemset frequenti
frequent_itemsets = []
k = 1
current_itemsets = [(item,) for item in frequent_items]  # Convertire in tuple

while current_itemsets:
    # Generare i candidati di livello k
    candidates = list(combinations(frequent_items, k))

    # Calcolare il supporto per ogni combinazione
    candidate_counts = transactions.flatMap(lambda t: [(c, 1) for c in candidates if set(c).issubset(set(t))]) \
                                   .reduceByKey(lambda x, y: x + y)

    # Filtrare per supporto minimo
    current_itemsets = candidate_counts.filter(lambda x: x[1] / total_transactions >= min_support) \
                                       .map(lambda x: x[0]) \
                                       .collect()

    # Aggiungere agli itemset frequenti
    frequent_itemsets.extend(current_itemsets)

    # Incrementare k per il prossimo ciclo
    k += 1

print("Frequent Itemsets:", frequent_itemsets)

In [43]:
from itertools import combinations

# Parametri
min_support = 0.01  # Soglia di supporto
min_confidence = 0.2  # Soglia di confidenza

def apriori(df, min_support):
    transactions = df.rdd.map(lambda x: x[1])  # Estrarre transazioni
    total_transactions = transactions.count()

    # Generare item singoli e calcolare il supporto
    item_counts = transactions.flatMap(lambda t: [(item, 1) for item in t]) \
                              .reduceByKey(lambda x, y: x + y)
    
    # Filtrare per supporto minimo
    frequent_items = item_counts.filter(lambda x: x[1] / total_transactions >= min_support) \
                                .map(lambda x: x[0]) \
                                .collect()

    # Creare itemset frequenti
    frequent_itemsets = []
    k = 1
    current_itemsets = [(item,) for item in frequent_items]  # Convertire in tuple

    while current_itemsets:
        # Generare i candidati di livello k
        candidates = list(combinations(frequent_items, k))

        # Calcolare il supporto per ogni combinazione
        candidate_counts = transactions.flatMap(lambda t: [(c, 1) for c in candidates if set(c).issubset(set(t))]) \
                                       .reduceByKey(lambda x, y: x + y)

        # Filtrare per supporto minimo
        current_itemsets = candidate_counts.filter(lambda x: x[1] / total_transactions >= min_support) \
                                           .map(lambda x: x[0]) \
                                           .collect()

        # Aggiungere agli itemset frequenti
        frequent_itemsets.extend(current_itemsets)

        # Incrementare k per il prossimo ciclo
        k += 1

    return frequent_itemsets


In [44]:
# Funzione per calcolare regole di associazione
def generate_rules(frequent_itemsets, df, min_confidence):
    transactions = df.rdd.map(lambda x: x[1])
    total_transactions = transactions.count()
    rules = []

    for itemset in frequent_itemsets:
        if len(itemset) > 1:
            for i in range(1, len(itemset)):
                for antecedent in combinations(itemset, i):
                    consequent = tuple(set(itemset) - set(antecedent))

                    # Calcolare supporto
                    support_itemset = transactions.filter(lambda t: set(itemset).issubset(set(t))).count() / total_transactions
                    support_antecedent = transactions.filter(lambda t: set(antecedent).issubset(set(t))).count() / total_transactions
                    confidence = support_itemset / support_antecedent if support_antecedent > 0 else 0

                    # Controllare la soglia di confidenza
                    if confidence >= min_confidence:
                        rules.append((antecedent, consequent, confidence))

    return rules

In [45]:
# Eseguire Apriori
frequent_itemsets_ap = apriori(df_filtered, min_support)
print("Frequent Itemsets:", frequent_itemsets_ap)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 268.0 failed 1 times, most recent failure: Lost task 5.0 in stage 268.0 (TID 1032) (192.168.1.51 executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:789)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:789)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	... 32 more


In [None]:
import sys
print("Python Path:", sys.executable)

In [None]:
# Generare regole di associazione
association_rules_ap = generate_rules(frequent_itemsets_ap, df_filtered, min_confidence)
print("Association Rules:")
for rule in association_rules_ap:
    print(f"{rule[0]} → {rule[1]}, Confidenza: {rule[2]:.2f}")

In [None]:
#################################

In [None]:
#SON algorithm

In [None]:
# Parametri
total_transactions = df.count()  # Numero totale di transazioni
chunk_size = 2  # Numero di partizioni (simula i blocchi)

# Funzione per estrarre itemset frequenti localmente (fase 1)
def local_frequent_itemsets(partition, min_support, chunk_size):
    partition = list(partition)  # Convertire in lista (per evitare problemi con RDD)
    partition_size = len(partition)

    # Se la partizione è vuota, restituire nulla
    if partition_size == 0:
        return []

    # Calcolare supporto locale
    local_threshold = (partition_size / total_transactions) * min_support

    # Estrarre item unici
    items = [item for transaction in partition for item in transaction[1]]
    unique_items = set(items)

    # Contare item singoli
    item_counts = {item: items.count(item) for item in unique_items}

    # Filtrare per supporto minimo
    frequent_items = {k: v for k, v in item_counts.items() if v >= local_threshold}

    # Generare itemset più grandi
    k = 2
    current_itemsets = [(item,) for item in frequent_items]
    all_frequent_itemsets = set(frequent_items.keys())

    while current_itemsets:
        candidates = list(combinations(all_frequent_itemsets, k))

        # Contare le occorrenze nei blocchi locali
        candidate_counts = {c: sum(1 for transaction in partition if set(c).issubset(transaction[1])) for c in candidates}

        # Filtrare itemset frequenti locali
        current_itemsets = {k: v for k, v in candidate_counts.items() if v >= local_threshold}

        # Aggiungere agli itemset frequenti
        all_frequent_itemsets.update(current_itemsets.keys())
        k += 1

    return all_frequent_itemsets

# Funzione per filtrare itemset frequenti globalmente (fase 2)
def global_filter(itemsets, df, min_support):
    transactions = df.rdd.map(lambda x: x[1])
    total_transactions = transactions.count()

    # Calcolare il supporto globale
    global_counts = {itemset: transactions.filter(lambda t: set(itemset).issubset(set(t))).count() for itemset in itemsets}
    return {k: v for k, v in global_counts.items() if v / total_transactions >= min_support}

# Eseguire SON Algorithm
rdd = df.rdd.repartition(chunk_size)
local_frequent_sets = rdd.mapPartitions(lambda partition: local_frequent_itemsets(partition, min_support, chunk_size)).distinct()
global_frequent_sets = global_filter(local_frequent_sets.collect(), df, min_support)

# Stampare i risultati
print("Itemset frequenti globali:")
for itemset, count in global_frequent_sets.items():
    print(f"{itemset}: {count}")