# Amazon Books Reviews

This project is about **market-basket analysis** on a dataset of Amazon books reviews. The goal is to find association rules between books, in order to suggest to the users which books they might be interested in, based on the reviews they have already made.

The dataset is downloaded from Kaggle via its API and unzipped in two different files:
* `Books_rating.csv` contains the reviews of the books.
* `Books_data.csv` contains information about books.

In [1]:
! pip install kaggle



In [2]:
import os
os.environ['KAGGLE_USERNAME'] = "lorispalmarin"
os.environ['KAGGLE_KEY'] = "093953c742e5b0e7f5ff80eb62b89eba"
!kaggle datasets download --unzip mohamedbakhet/amazon-books-reviews

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0


In [3]:
from pyspark.sql import SparkSession
from itertools import combinations
from collections import defaultdict
import pandas as pd
from pyspark.sql.functions import col, lower

---

## Spark Setup

The first step is to create a Spark session to work with the data. We will use the `SparkSession` class to create a new session, and the `SparkContext` class to interact with the Spark cluster. The session will be created with the name "Books Reviews".

In [4]:
spark = SparkSession.builder.appName("Books Reviews").getOrCreate()
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/26 17:29:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


---

## Data Upload and Pre-processing

I uploaded the dataset within the Jupyter environment. Now I will load the data into a Spark DataFrame and perform some pre-processing operations.

### Books Ratings

In [5]:
date = spark.read.csv("Books_rating.csv", header=True, inferSchema=True)
date = date.withColumnRenamed("review/score", "review_score") # just convenience in SQL
date.show(5)

                                                                                

+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|        Id|               Title|Price|       User_id|         profileName|review/helpfulness|review_score|review/time|      review/summary|         review/text|
+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|1882931173|Its Only Art If I...| NULL| AVCGYZL8FQQTD|"Jim of Oz ""jim-...|               7/7|         4.0|  940636800|Nice collection o...|This is only for ...|
|0826414346|Dr. Seuss: Americ...| NULL|A30TK6U7DNS82R|       Kevin Killian|             10/10|         5.0| 1095724800|   Really Enjoyed It|I don't care much...|
|0826414346|Dr. Seuss: Americ...| NULL|A3UH4UZ4RSVO82|        John Granger|             10/11|         5.0| 1078790400|Essential for eve...|"If people become...|
|0826414346|Dr. Seuss: Ameri

---

### Sampling
In order to ease operations on the dataset during project design, I introduced a `sampling` variable to keep only a fraction of the users (1%). This will allow me to work with a smaller dataset and speed up the operations.

If `sampling` is set to `True`, the dataset will be sampled. Otherwise, the dataset will be kept as it is.

In [6]:
sampling = False
sample_fraction = 0.01

if sampling == True:
    user_sample = (date.select("User_id").distinct().sample(fraction=sample_fraction, seed=42))
    date = date.join(user_sample, on="User_id", how="inner")
else:
    date = date

---

### Dealing with Duplicates

Next, we take books with same title and different IDs and we keep only one ID for each book.



In [7]:
from pyspark.sql.functions import col, first

date = date.withColumn("title", lower(col("title")))

df_unique = date.groupBy("Title").agg(first("Id").alias("New_Id"))

df_final = date.join(df_unique, on="Title", how="left").drop("Id").withColumnRenamed("New_Id", "Id")

# Mostriamo il risultato
df_final.show()



+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+----------+
|               title|Price|       User_id|         profileName|review/helpfulness|review_score|review/time|      review/summary|         review/text|        Id|
+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+----------+
|death by chocolat...| NULL|A1KXONFPU2XQ5K|    Stephanie Manley|              9/10|         5.0|  983577600|A must have for c...|If you are a choc...|B0002RQ1JU|
|death by chocolat...| NULL|A1OX82JPAQLL60|         rodboomboom|               6/6|         5.0| 1072483200|Talk About &quot;...|You won't find su...|B0002RQ1JU|
|death by chocolat...| NULL|          NULL|                NULL|               8/9|         5.0|  982886400|A Must-Have for C...|The one thing I h...|B0002RQ1JU|
|death by chocolat...| NULL|

                                                                                

In [8]:
print('Dataset has', df_final.count(), 'reviews.')



Dataset has 3000000 reviews.


                                                                                

I created a SQLContext to work with SQL queries on the DataFrame. I will use it to filter the columns I need, which are `Id` and `User_id`.

In [9]:
df_final.createOrReplaceTempView("ratings")
query = "SELECT Id, User_id FROM ratings"
data = spark.sql(query)

In [10]:
data.show(5)



+----------+--------------+
|        Id|       User_id|
+----------+--------------+
|1882931173| AVCGYZL8FQQTD|
|B000J6DLBU|A1G37DFO8MQW0M|
|B000J6DLBU| AJ98YA4Y333BK|
|B000J6DLBU| AB68LG08VDFL3|
|B000J6DLBU|          NULL|
+----------+--------------+
only showing top 5 rows



                                                                                

The dataset contains **3 milions** of reviews. I will remove duplicates and null values.

In [11]:
data = data.dropDuplicates().cache()
data = data.dropna()

In [12]:
data.count()

25/03/26 17:30:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 17:30:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 17:30:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 17:30:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 17:30:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 17:30:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 17:30:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 17:30:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                

2046358

After removing duplicates and null values, the dataset contains about **2.1 milions** of reviews. I will check the schema of the DataFrame.

In [13]:
data.printSchema()

root
 |-- Id: string (nullable = true)
 |-- User_id: string (nullable = true)



The schema of the DataFrame is correct.

I will check the number of different users and books in the dataset.

In [14]:
print("Dataset contains", data.select("User_id").distinct().count(), "different users")
print("Dataset contains", data.select("Id").distinct().count(), "different books")


Dataset contains 1008423 different users
Dataset contains 203791 different books


### Books Data

Next, I uploaded the `Books_data.csv` file and loaded it into a new SQL Dataframe.

This will be useful for latter analysis, as it contains information about the books.

In [15]:
book_info = spark.read.csv("books_data.csv", header=True, inferSchema=True)
book_info.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+------------+
|               Title|         description|             authors|               image|         previewLink|           publisher| publishedDate|            infoLink|          categories|ratingsCount|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+------------+
|Its Only Art If I...|                NULL|    ['Julie Strain']|http://books.goog...|http://books.goog...|                NULL|          1996|http://books.goog...|['Comics & Graphi...|        NULL|
|Dr. Seuss: Americ...|"Philip Nel takes...| like that of Lew...| has changed lang...| giving us new wo...| inspiring artist...|['Philip Nel']|http://books.goog...|http://books.goog...|   A&C Black|
|Wonderful

---

## Algorithms

Now, I will implement two different algorithms to find association rules between books. The first algorithm is the **A-Priori algorithm**, which is a classical algorithm for market-basket analysis. The second algorithm is the **SON algorithm**, which is a more efficient version of the A-Priori algorithm, designed to work with large datasets.

---

### 1. A-Priori Algorithm

The A-Priori Algorithm is a two-pass approach designed to efficiently find frequent itemsets while reducing memory usage. Instead of storing all possible item pairs, it eliminates infrequent items early, making the counting process more efficient.

At first, I created baskets from original data and stored them in a RDD:


In [16]:
book_baskets = (data.rdd.
                map(lambda row: (row["User_id"], row["Id"])).
                groupByKey().
                mapValues(list))
book_baskets.take(5)

                                                                                

[('A2JENYWRQHT8TW', ['0759614512']),
 ('A3ANBQAF4OY0P1', ['1416509879']),
 ('AFW72F92KX7AJ', ['B0002ST9TI']),
 ('ACTMM7G8M2LW3', ['B000NQ655K', 'B0006AWDI6', 'B000OW4E7O']),
 ('A23KYX0MQS9S4J', ['1588209202'])]

In [17]:
number_of_baskets = book_baskets.count()
number_of_baskets

1008423

#### First pass: Finding frequent singletons

In this step, I will find the singletons that appear more than a certain threshold in the dataset. I will count the occurrences of each book and filter the frequent ones.

At first, I counted all singletons:

In [18]:
single_counts = (
    book_baskets.
    flatMap(lambda r:r[1]). # isolates movies
    map(lambda r: (r,1)). # count = 1 for each movie
    reduceByKey(lambda a, b: a + b). # group and sum movies count
    collect() # convert to list
)
single_counts

                                                                                

[('B000OW4E7O', 487),
 ('B00005X55Y', 190),
 ('B000NKGYNE', 44),
 ('B000PC55CG', 58),
 ('0553289713', 1578),
 ('B000KIIVNU', 69),
 ('B000PMCF1A', 2125),
 ('B000133Q02', 90),
 ('B000OUU17S', 15),
 ('B0002XH6T8', 1408),
 ('B000IZQPCY', 123),
 ('1555534856', 1),
 ('B0001FZGSA', 52),
 ('B000PAK6M2', 214),
 ('B000FZ358E', 298),
 ('B000EVCRJ2', 60),
 ('B000HZ9A2W', 767),
 ('B000CC49LC', 67),
 ('B000K09Y66', 24),
 ('B000GLI9HY', 677),
 ('B0006DI170', 30),
 ('0786254521', 8),
 ('B000GYVRI4', 255),
 ('B00029ZWQG', 94),
 ('0896211150', 89),
 ('B000JPE8OA', 72),
 ('B000MK50EE', 825),
 ('B000OUIL1G', 107),
 ('B000U2H682', 54),
 ('B000CEXDTO', 412),
 ('B0001KIH22', 307),
 ('078576223X', 91),
 ('B0007ZNUY6', 29),
 ('0802135331', 40),
 ('1597370037', 577),
 ('1573220876', 212),
 ('1931018146', 78),
 ('1578567297', 31),
 ('0898455537', 198),
 ('B0002X7W3S', 38),
 ('B00069QMWA', 127),
 ('1860497624', 37),
 ('B000P8PQQK', 90),
 ('B000H3TE3Y', 66),
 ('B0007J40FK', 28),
 ('1400033217', 7),
 ('B0006CZNKY',

Then, I filtered the frequent singletons comparing them to a support threshold:

In [19]:
percentage_of_total = 0.001
support_threshold = percentage_of_total * number_of_baskets
frequent_singletons = list(filter(lambda r: r[1] >= support_threshold, single_counts))
frequent_singletons
print("There is a total of", len(frequent_singletons), "frequent singletons")

There is a total of 81 frequent singletons


In [20]:
frequent_singletons_list = [x[0] for x in frequent_singletons]

In [21]:
frequent_singletons

[('0553289713', 1578),
 ('B000PMCF1A', 2125),
 ('B0002XH6T8', 1408),
 ('B000K0DB8I', 1469),
 ('B000NWU3I4', 3562),
 ('B000N76ZCC', 1493),
 ('B000GROP62', 1440),
 ('0140860428', 1160),
 ('0582528259', 1158),
 ('0141804459', 1759),
 ('B000I1VJLA', 1836),
 ('1856867323', 1232),
 ('B0007379RC', 1187),
 ('B000Q6XPDW', 1242),
 ('B000EANQJ8', 1389),
 ('B000NPEWHE', 1543),
 ('B000NHNM3C', 1425),
 ('9562910334', 1441),
 ('1847022251', 1158),
 ('0140351310', 1158),
 ('B000MOOAJG', 2644),
 ('B0006IU3ZS', 2961),
 ('B000GSQ910', 1209),
 ('B000FC2QGE', 1423),
 ('B00086Q244', 1040),
 ('8188280046', 1754),
 ('B000H7EO2G', 1848),
 ('1932100385', 1458),
 ('0143057804', 1011),
 ('B000KISQC6', 1156),
 ('B000P1QRII', 1875),
 ('B0000YSH5G', 1234),
 ('B000FFJRI6', 1846),
 ('B000TKO3EA', 1010),
 ('B0007C10MS', 1404),
 ('B000I3NFKG', 2134),
 ('B0000CO4JZ', 1419),
 ('1556909330', 1027),
 ('0613334582', 1925),
 ('B000HJNEYS', 1629),
 ('B000J521DU', 2133),
 ('B000ETWJ74', 1467),
 ('0312857055', 1252),
 ('03304879

#### Second pass: finding frequent pairs

Using map-reduce, I will count the occurrences of pairs of frequent books in the baskets.



In [22]:
pairs_counts = (
      book_baskets
      .map(lambda r:r[1])
      .map(lambda r: [x for x in r if x in frequent_singletons_list])
      .filter(lambda r: len(r)>=2)
      .flatMap(lambda r: list(combinations(r, 2)))
      .map(lambda r:tuple(sorted(r)))
      .map(lambda r:(r,1))
      .reduceByKey(lambda a,b:a+b)
      .collect()
)
pairs_counts

                                                                                

[(('0553289713', 'B000HKLROQ'), 23),
 (('B0002XH6T8', 'B000HKLROQ'), 36),
 (('B00005WNTY', 'B000TKO3EA'), 35),
 (('B00005WNTY', 'B0007C10MS'), 24),
 (('B00005WNTY', 'B000I3NFKG'), 28),
 (('B00005WNTY', 'B0000CO4JZ'), 24),
 (('B000GQG5MA', 'B000I3NFKG'), 29),
 (('0553289713', 'B000NDSX6C'), 10),
 (('B000N76ZCC', 'B000NDSX6C'), 53),
 (('B000NDSX6C', 'B000Q032UY'), 3511),
 (('B000GROP62', 'B000NDSX6C'), 38),
 (('1556909330', 'B000TZ19TC'), 30),
 (('0435126024', '0451518845'), 1158),
 (('0140860428', '0435126024'), 1158),
 (('0435126024', '0460112872'), 1158),
 (('B00005WNTY', 'B000HJNEYS'), 66),
 (('B0006Y8M7S', 'B000FC2QGE'), 27),
 (('B000GSDG8E', 'B000N6R4AA'), 10),
 (('B0006Y8M7S', 'B00086Q244'), 1034),
 (('1844560333', '8188280046'), 1754),
 (('0435126075', '1844560333'), 1756),
 (('0141804459', '0435126075'), 1756),
 (('9562910334', 'B000BI4160'), 25),
 (('9562910334', 'B000PIIMPW'), 25),
 (('B000H7EO2G', 'B000I1VJLA'), 1831),
 (('B000I1VJLA', 'B000NDSX6C'), 225),
 (('0003300277', 'B

I will filter the frequent pairs comparing them to a support threshold:

In [23]:
frequent_pairs = list(filter(lambda r: r[1] >= support_threshold, pairs_counts))
frequent_pairs
print("THere is a total of", len(frequent_pairs), "frequent pairs")

THere is a total of 84 frequent pairs


Most frequent pairs of books:

In [24]:
sorted(frequent_pairs, key=lambda x: x[1], reverse=True)

[(('B000ILIJE0', 'B000NDSX6C'), 3574),
 (('B000NDSX6C', 'B000NWU3I4'), 3562),
 (('B000ILIJE0', 'B000NWU3I4'), 3561),
 (('B000NDSX6C', 'B000Q032UY'), 3511),
 (('B000ILIJE0', 'B000Q032UY'), 3511),
 (('B000NWU3I4', 'B000Q032UY'), 3505),
 (('B000GQG5MA', 'B000NDSX6C'), 3408),
 (('B000GQG5MA', 'B000ILIJE0'), 3407),
 (('B000GQG5MA', 'B000NWU3I4'), 3402),
 (('B000GQG5MA', 'B000Q032UY'), 3354),
 (('B000I3NFKG', 'B000PMCF1A'), 2125),
 (('B000FAIRN2', 'B000J1OR0Y'), 1858),
 (('B000BI4160', 'B000FAIRN2'), 1857),
 (('B000BI4160', 'B000J1OR0Y'), 1857),
 (('B000FAIRN2', 'B000GQK706'), 1852),
 (('B000GQK706', 'B000J1OR0Y'), 1851),
 (('B000J1OR0Y', 'B000PIIMPW'), 1850),
 (('B000FAIRN2', 'B000PIIMPW'), 1850),
 (('B000BI4160', 'B000GQK706'), 1850),
 (('B000BI4160', 'B000PIIMPW'), 1849),
 (('B000H7EO2G', 'B000J1OR0Y'), 1848),
 (('B000FAIRN2', 'B000H7EO2G'), 1848),
 (('B000H7EO2G', 'B000PIIMPW'), 1847),
 (('B000BI4160', 'B000H7EO2G'), 1847),
 (('B000GQK706', 'B000PIIMPW'), 1843),
 (('B000GQK706', 'B000H7E

#### Third pass: finding frequent triplets

In [25]:
triplets_counts = (
      book_baskets
      .map(lambda r:r[1])
      .map(lambda r: [x for x in r if x in frequent_singletons_list])
      .filter(lambda r: len(r)>=2)
      .flatMap(lambda r: list(combinations(r, 3)))
      .map(lambda r:tuple(sorted(r)))
      .map(lambda r:(r,1))
      .reduceByKey(lambda a,b:a+b)
      .collect()
)
triplets_counts

                                                                                

[(('0553289713', 'B000PMCF1A', 'B000Q6XPDW'), 5),
 (('0553289713', 'B0007C10MS', 'B000PMCF1A'), 18),
 (('0553289713', 'B000I3NFKG', 'B000PMCF1A'), 55),
 (('0553289713', 'B0000CO4JZ', 'B000PMCF1A'), 18),
 (('0553289713', 'B0002XH6T8', 'B000Q6XPDW'), 7),
 (('0553289713', 'B0002XH6T8', 'B000EANQJ8'), 6),
 (('0553289713', 'B0000CO4JZ', 'B0002XH6T8'), 6),
 (('0460872702', '0553289713', 'B00005WNTY'), 6),
 (('0553289713', 'B000HKLROQ', 'B000TKO3EA'), 23),
 (('0553289713', 'B000HKLROQ', 'B000I3NFKG'), 9),
 (('B0002XH6T8', 'B000PMCF1A', 'B000Q6XPDW'), 6),
 (('B0002XH6T8', 'B0007C10MS', 'B000PMCF1A'), 5),
 (('B0002XH6T8', 'B000I3NFKG', 'B000PMCF1A'), 13),
 (('0330487965', 'B00005WNTY', 'B000PMCF1A'), 6),
 (('B00005WNTY', 'B000HKLROQ', 'B000PMCF1A'), 9),
 (('0460872702', 'B000EANQJ8', 'B000PMCF1A'), 120),
 (('0330487965', 'B00005WNTY', 'B0002XH6T8'), 71),
 (('B00005WNTY', 'B0002XH6T8', 'B000HKLROQ'), 13),
 (('B0002XH6T8', 'B000HKLROQ', 'B000TKO3EA'), 36),
 (('B0002XH6T8', 'B000HKLROQ', 'B000I3NF

In [26]:
frequent_triplets = list(filter(lambda r: r[1] >= support_threshold, triplets_counts))
frequent_triplets
print("THere is a total of", len(frequent_triplets), "frequent triplets")

THere is a total of 111 frequent triplets


In [27]:
sorted(frequent_triplets, key=lambda x: x[1], reverse=True)

[(('B000ILIJE0', 'B000NDSX6C', 'B000NWU3I4'), 3561),
 (('B000ILIJE0', 'B000NDSX6C', 'B000Q032UY'), 3511),
 (('B000ILIJE0', 'B000NWU3I4', 'B000Q032UY'), 3505),
 (('B000NDSX6C', 'B000NWU3I4', 'B000Q032UY'), 3505),
 (('B000GQG5MA', 'B000ILIJE0', 'B000NDSX6C'), 3407),
 (('B000GQG5MA', 'B000NDSX6C', 'B000NWU3I4'), 3402),
 (('B000GQG5MA', 'B000ILIJE0', 'B000NWU3I4'), 3401),
 (('B000GQG5MA', 'B000NDSX6C', 'B000Q032UY'), 3354),
 (('B000GQG5MA', 'B000ILIJE0', 'B000Q032UY'), 3354),
 (('B000GQG5MA', 'B000NWU3I4', 'B000Q032UY'), 3348),
 (('B000BI4160', 'B000FAIRN2', 'B000J1OR0Y'), 1857),
 (('B000FAIRN2', 'B000GQK706', 'B000J1OR0Y'), 1851),
 (('B000FAIRN2', 'B000J1OR0Y', 'B000PIIMPW'), 1850),
 (('B000BI4160', 'B000FAIRN2', 'B000GQK706'), 1850),
 (('B000BI4160', 'B000GQK706', 'B000J1OR0Y'), 1850),
 (('B000BI4160', 'B000FAIRN2', 'B000PIIMPW'), 1849),
 (('B000BI4160', 'B000J1OR0Y', 'B000PIIMPW'), 1849),
 (('B000FAIRN2', 'B000H7EO2G', 'B000J1OR0Y'), 1848),
 (('B000BI4160', 'B000H7EO2G', 'B000J1OR0Y'), 

---

### 2. SON Algorithm

The **SON (Savasere, Omiecinski, and Navathe) Algorithm** is a scalable approach for **frequent itemset mining in large-scale datasets**, particularly suited for distributed computing environments. Unlike the standard **A-Priori Algorithm**, which scans the entire dataset multiple times, SON leverages a **divide-and-conquer strategy** by breaking the dataset into **independent chunks** that are processed in parallel.

This method allows **local frequent itemsets** to be identified in each chunk before combining them to find **global frequent itemsets** across the entire dataset. By working on smaller partitions first, SON reduces memory consumption and improves efficiency in distributed systems.

When implemented with **MapReduce**, the algorithm is executed in two **MapReduce passes**:

1. **First Pass:** Identifies locally frequent itemsets within each chunk and merges them into a set of candidate itemsets.
2. **Second Pass:** Computes the **global support** of the candidate itemsets across all data chunks and extracts the final **frequent itemsets**.

This approach significantly improves **scalability** by distributing computation across multiple nodes, making it well-suited for handling massive datasets in a parallel computing environment.

In [28]:
# conversion of baskets from rdd to list
SON_baskets = book_baskets.collect()
SON_baskets = [basket for _, basket in SON_baskets]

                                                                                

At first, I **parallelised** the basket dataset in 10 partitions and set the **minimum support** (both local and global):

In [29]:
# Parallelizing (10 partitions)
rdd = spark.sparkContext.parallelize(SON_baskets, numSlices=10)

# Set local and global support
s = support_threshold  # Global support
p = 1 / rdd.getNumPartitions()  # Fraction of data for each partition
local_s = s * p  # Local support

In the **first phase**, I want to find candidate frequent itemsets locally (chunk by chunk) and merge them in a set of global candidates:

In [30]:
### FIRST PHASE MAPREDUCE: FIND-LOCAL FREQUENT ITEMSETS ###

def find_frequent_itemsets(partition):
    partition_list = list(partition)  # conversion to list for iteration
    counts = defaultdict(int)

    # creating itemsets
    for basket in partition_list:
        for item in basket:
            counts[frozenset([item])] += 1  # Single itemsets
        for pair in combinations(basket, 2):  # 2 element itemsets
            counts[frozenset(pair)] += 1

    # Find locally frequent itemsets
    frequent_itemsets = [itemset for itemset, count in counts.items() if count >= local_s]

    return [(itemset, 1) for itemset in frequent_itemsets]

# MAP 1: Find local candidates
candidates_rdd = rdd.mapPartitions(find_frequent_itemsets)

# REDUCE 1: Merge local candidates to get global candidates
candidates = candidates_rdd.distinct().collect()
candidates_set = set(candidates)  # Conversion to set


25/03/26 17:30:22 WARN TaskSetManager: Stage 100 contains a task of very large size (3267 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In the second phase, I will compare the frequent itemsets on all partitions and compare them with the global support:

In [31]:
candidates_set

{(frozenset({'B0007C10MS'}), 1),
 (frozenset({'B000C1JK1W'}), 1),
 (frozenset({'9626341823', 'B000NYRZ42'}), 1),
 (frozenset({'B0006DXUU8', 'B000NYRZ42'}), 1),
 (frozenset({'B000J33RXK', 'B000NQ9QF6'}), 1),
 (frozenset({'B000H7EO2G', 'B000J1OR0Y'}), 1),
 (frozenset({'B000JQXNSQ'}), 1),
 (frozenset({'B000ILIJE0', 'B000NWU3I4'}), 1),
 (frozenset({'B000I929MA', 'B000JVNX4K'}), 1),
 (frozenset({'1593355548', 'B000P4Q3JS'}), 1),
 (frozenset({'9562910334'}), 1),
 (frozenset({'B0006BUMYC', 'B000P6L66Q'}), 1),
 (frozenset({'0395051029', '1593355548'}), 1),
 (frozenset({'B000FELJBU'}), 1),
 (frozenset({'0808510258', 'B000NNOTXI'}), 1),
 (frozenset({'B0007K33F2', 'B0008C546U'}), 1),
 (frozenset({'B000Q6XPDW'}), 1),
 (frozenset({'B000GKURY8'}), 1),
 (frozenset({'0395051029', 'B000P4Q3JS'}), 1),
 (frozenset({'0435126024', '1847022251'}), 1),
 (frozenset({'B000FFJRI6'}), 1),
 (frozenset({'1593355548'}), 1),
 (frozenset({'0140860096', '0435120956'}), 1),
 (frozenset({'0435126024', '0460112872'}), 1)

In [32]:
### SECOND PHASE MAPREDUCE: COMPUTE GLOBAL SUPPORT ###

def count_candidates(partition):
    partition_list = list(partition)
    counts = defaultdict(int)

    for basket in partition_list:
        for candidate in candidates_set:
            if candidate[0].issubset(set(basket)):  # is candidate in the basket?
                counts[candidate[0]] += 1

    return counts.items()

# MAP 2: Count frequency of candidate itemsets in dataset chunks
counts_rdd = rdd.mapPartitions(count_candidates)

# REDUCE 2: Get counts and filter globally frequent itemsets
frequent_itemsets = counts_rdd.reduceByKey(lambda x, y: x + y) \
                             .filter(lambda x: x[1] >= s) \
                             .collect()

25/03/26 17:32:02 WARN TaskSetManager: Stage 102 contains a task of very large size (3267 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Below **results**, ordered in descending order of support:

**Generally frequent itemsets**:

In [33]:
sorted(frequent_itemsets, key=lambda x: x[1], reverse=True)

[(frozenset({'B000IEZE3G'}), 3663),
 (frozenset({'B000NDSX6C'}), 3577),
 (frozenset({'B000ILIJE0'}), 3576),
 (frozenset({'B000ILIJE0', 'B000NDSX6C'}), 3574),
 (frozenset({'B000NWU3I4'}), 3562),
 (frozenset({'B000NDSX6C', 'B000NWU3I4'}), 3562),
 (frozenset({'B000ILIJE0', 'B000NWU3I4'}), 3561),
 (frozenset({'B000NDSX6C', 'B000Q032UY'}), 3511),
 (frozenset({'B000ILIJE0', 'B000Q032UY'}), 3511),
 (frozenset({'B000Q032UY'}), 3511),
 (frozenset({'B000NWU3I4', 'B000Q032UY'}), 3505),
 (frozenset({'B000GQG5MA', 'B000NDSX6C'}), 3408),
 (frozenset({'B000GQG5MA'}), 3408),
 (frozenset({'B000GQG5MA', 'B000ILIJE0'}), 3407),
 (frozenset({'B000GQG5MA', 'B000NWU3I4'}), 3402),
 (frozenset({'B000GQG5MA', 'B000Q032UY'}), 3354),
 (frozenset({'B0006IU3ZS'}), 2961),
 (frozenset({'B000MOOAJG'}), 2644),
 (frozenset({'B000GSDG8E'}), 2252),
 (frozenset({'B000I3NFKG'}), 2134),
 (frozenset({'B000J521DU'}), 2133),
 (frozenset({'B000PMCF1A'}), 2125),
 (frozenset({'B000I3NFKG', 'B000PMCF1A'}), 2125),
 (frozenset({'0613

**2-element frequent itemsets**:

In [34]:
filtered_itemsets = [itemset for itemset in frequent_itemsets if len(itemset[0]) == 2]
sorted(filtered_itemsets, key=lambda x: x[1], reverse=True)

[(frozenset({'B000ILIJE0', 'B000NDSX6C'}), 3574),
 (frozenset({'B000NDSX6C', 'B000NWU3I4'}), 3562),
 (frozenset({'B000ILIJE0', 'B000NWU3I4'}), 3561),
 (frozenset({'B000NDSX6C', 'B000Q032UY'}), 3511),
 (frozenset({'B000ILIJE0', 'B000Q032UY'}), 3511),
 (frozenset({'B000NWU3I4', 'B000Q032UY'}), 3505),
 (frozenset({'B000GQG5MA', 'B000NDSX6C'}), 3408),
 (frozenset({'B000GQG5MA', 'B000ILIJE0'}), 3407),
 (frozenset({'B000GQG5MA', 'B000NWU3I4'}), 3402),
 (frozenset({'B000GQG5MA', 'B000Q032UY'}), 3354),
 (frozenset({'B000I3NFKG', 'B000PMCF1A'}), 2125),
 (frozenset({'B000FAIRN2', 'B000J1OR0Y'}), 1858),
 (frozenset({'B000BI4160', 'B000FAIRN2'}), 1857),
 (frozenset({'B000BI4160', 'B000J1OR0Y'}), 1857),
 (frozenset({'B000FAIRN2', 'B000GQK706'}), 1852),
 (frozenset({'B000GQK706', 'B000J1OR0Y'}), 1851),
 (frozenset({'B000FAIRN2', 'B000PIIMPW'}), 1850),
 (frozenset({'B000BI4160', 'B000GQK706'}), 1850),
 (frozenset({'B000J1OR0Y', 'B000PIIMPW'}), 1850),
 (frozenset({'B000BI4160', 'B000PIIMPW'}), 1849),


---
## Association rules

Association rules are used to uncover relationships between items in large transactional datasets. They are fundamental in market-basket analysis, helping to identify patterns in purchasing behavior. Given a set of frequent itemsets, association rules allow us to infer how the presence of one item (or a set of items) influences the likelihood of another item appearing in the same transaction.

The key metrics used to evaluate association rules are:

* **Support**: Measures how frequently an itemset appears in the dataset.
$$
Support(A) = \frac{\text{Count}(A)}{\text{Total Transactions}}
$$
* **Confidence**: Represents the probability that an item B is purchased given that item A is also purchased.
$$
Confidence(A \Rightarrow B) = \frac{Support(A \cup B)}{Support(A)}
$$
* **Lift**: Indicates how much more likely item B is to appear with item A compared to if they were independent.
$$
Lift(A \Rightarrow B) = \frac{Confidence(A \Rightarrow B)}{Support(B)}
$$

By analyzing these metrics, we can determine which itemsets provide the most meaningful associations and can be used for applications such as recommendation systems and targeted marketing.

---

#### Association rules from pairs

At first, I created two dataframes to store frequent singletons and pairs, and computed their support:



In [35]:
# Compute total number of baskets (avoid collect() for performance)
total_baskets = number_of_baskets

# Dataframe for frequent singletons and compute support
df_singles = pd.DataFrame(frequent_singletons, columns=['Item', 'Count'])
df_singles['Support'] = df_singles['Count'] / total_baskets
df_singles

Unnamed: 0,Item,Count,Support
0,0553289713,1578,0.001565
1,B000PMCF1A,2125,0.002107
2,B0002XH6T8,1408,0.001396
3,B000K0DB8I,1469,0.001457
4,B000NWU3I4,3562,0.003532
...,...,...,...
76,B000TZ19TC,1146,0.001136
77,1587263971,1165,0.001155
78,B000FAIRN2,1859,0.001843
79,B000GQK706,1852,0.001837


In [36]:
# Dataframe for frequent pairs and compute support
df_pairs = pd.DataFrame(frequent_pairs, columns=['Pair', 'Count'])
df_pairs['Support'] = df_pairs['Count'] / total_baskets
df_pairs

Unnamed: 0,Pair,Count,Support
0,"(B000NDSX6C, B000Q032UY)",3511,0.003482
1,"(0435126024, 0451518845)",1158,0.001148
2,"(0140860428, 0435126024)",1158,0.001148
3,"(0435126024, 0460112872)",1158,0.001148
4,"(B0006Y8M7S, B00086Q244)",1034,0.001025
...,...,...,...
79,"(0140351310, 0460112872)",1158,0.001148
80,"(B000BI4160, B000I1VJLA)",1833,0.001818
81,"(B000FAIRN2, B000H7EO2G)",1848,0.001833
82,"(B000GQK706, B000H7EO2G)",1841,0.001826


In [37]:
# Split elements in pairs
df_pairs[['Item_A', 'Item_B']] = pd.DataFrame(df_pairs['Pair'].tolist(), index=df_pairs.index)
df_pairs.drop(columns=['Pair'], inplace=True)
df_pairs

Unnamed: 0,Count,Support,Item_A,Item_B
0,3511,0.003482,B000NDSX6C,B000Q032UY
1,1158,0.001148,0435126024,0451518845
2,1158,0.001148,0140860428,0435126024
3,1158,0.001148,0435126024,0460112872
4,1034,0.001025,B0006Y8M7S,B00086Q244
...,...,...,...,...
79,1158,0.001148,0140351310,0460112872
80,1833,0.001818,B000BI4160,B000I1VJLA
81,1848,0.001833,B000FAIRN2,B000H7EO2G
82,1841,0.001826,B000GQK706,B000H7EO2G


Then, I merged singles and pairs to get the support of each pair and compute confidence and lift:

In [38]:
df_pairs = df_pairs.merge(
    df_singles[['Item', 'Count', 'Support']],
    how='left',
    left_on='Item_A',
    right_on='Item'
).rename(columns={
    'Count': 'Count_A',
    'Support': 'Support_A',
    'Item': 'Item_A_singles'
})

df_pairs = df_pairs.merge(
    df_singles[['Item', 'Count', 'Support']],
    how='left',
    left_on='Item_B',
    right_on='Item'
).rename(columns={
    'Count': 'Count_B',
    'Support': 'Support_B',
    'Item': 'Item_B_singles'
})

df_pairs.drop(columns=['Item_A_singles', 'Item_B_singles'], inplace=True)
df_pairs.rename(columns={'Count_x': 'Count_AB', 'Support_x': 'Support_AB', 'Support_y': 'Support_A', 'Count_y': 'Count_A'}, inplace=True)

In [39]:
df_pairs

Unnamed: 0,Count_AB,Support_AB,Item_A,Item_B,Count_A,Support_A,Count_B,Support_B
0,3511,0.003482,B000NDSX6C,B000Q032UY,3577,0.003547,3511,0.003482
1,1158,0.001148,0435126024,0451518845,1158,0.001148,1158,0.001148
2,1158,0.001148,0140860428,0435126024,1160,0.001150,1158,0.001148
3,1158,0.001148,0435126024,0460112872,1158,0.001148,1158,0.001148
4,1034,0.001025,B0006Y8M7S,B00086Q244,1036,0.001027,1040,0.001031
...,...,...,...,...,...,...,...,...
79,1158,0.001148,0140351310,0460112872,1158,0.001148,1158,0.001148
80,1833,0.001818,B000BI4160,B000I1VJLA,1857,0.001841,1836,0.001821
81,1848,0.001833,B000FAIRN2,B000H7EO2G,1859,0.001843,1848,0.001833
82,1841,0.001826,B000GQK706,B000H7EO2G,1852,0.001837,1848,0.001833


In [49]:
df_pairs['Confidence A→B'] = df_pairs['Support_AB'] / df_pairs['Support_A']
df_pairs['Confidence B→A'] = df_pairs['Support_AB'] / df_pairs['Support_B']
df_pairs['Lift'] = df_pairs['Confidence A→B'] / (df_pairs['Support_B'])

df_pairs.sort_values(by='Lift', ascending=False)

Unnamed: 0,Count_AB,Support_AB,Item_A,Item_B,Count_A,Support_A,Count_B,Support_B,Confidence A→B,Confidence B→A,Lift
10,1010,0.001002,B000HKLROQ,B000TKO3EA,1010,0.001002,1010,0.001002,1.000000,1.000000,998.438614
4,1034,0.001025,B0006Y8M7S,B00086Q244,1036,0.001027,1040,0.001031,0.998069,0.994231,967.765613
35,1146,0.001136,0003300277,B000TZ19TC,1147,0.001137,1146,0.001136,0.999128,1.000000,879.183086
1,1158,0.001148,0435126024,0451518845,1158,0.001148,1158,0.001148,1.000000,1.000000,870.831606
40,1158,0.001148,0140351310,0582528259,1158,0.001148,1158,0.001148,1.000000,1.000000,870.831606
...,...,...,...,...,...,...,...,...,...,...,...
45,3562,0.003532,B000NDSX6C,B000NWU3I4,3577,0.003547,3562,0.003532,0.995807,1.000000,281.918647
0,3511,0.003482,B000NDSX6C,B000Q032UY,3577,0.003547,3511,0.003482,0.981549,1.000000,281.918647
65,3561,0.003531,B000ILIJE0,B000NWU3I4,3576,0.003546,3562,0.003532,0.995805,0.999719,281.918315
13,3407,0.003379,B000GQG5MA,B000ILIJE0,3408,0.003380,3576,0.003546,0.999707,0.952740,281.914737


In [44]:
df_pairs.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 84 entries, 0 to 83
Data columns (total 11 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   Count_AB        84 non-null     int64  
 1   Support_AB      84 non-null     float64
 2   Item_A          84 non-null     object 
 3   Item_B          84 non-null     object 
 4   Count_A         84 non-null     int64  
 5   Support_A       84 non-null     float64
 6   Count_B         84 non-null     int64  
 7   Support_B       84 non-null     float64
 8   Confidence A→B  84 non-null     float64
 9   Confidence B→A  84 non-null     float64
 10  Lift            84 non-null     float64
dtypes: float64(6), int64(3), object(2)
memory usage: 7.3+ KB


Let's take, for example, the first association rule:
- A **confidence A→B** of 1.0 indicates that whenever item A appears in a basket, item B also appears.
- The **confidence B→A** ~0.89 suggests that B almost always co-occurs with A, but not perfectly.
- An extremely high **lift** (e.g., ~365), in this case, represents the main issue in this dataset: even if we tried to remove ID with same title, there are still some duplicates left due to typing mistakes or different editions of the same book.



The following code block simply queries the dataset to find the titles of the books related to a specific association rule:

In [58]:
query = "SELECT DISTINCT Id, Title FROM Ratings WHERE ID = 'B000HKLROQ' OR ID = 'B000TKO3EA'"
most_connected = spark.sql(query)
most_connected.show()

                                                                                

+----------+--------------------+
|        Id|               Title|
+----------+--------------------+
|B000TKO3EA|middlesex [unabri...|
|B000HKLROQ|           middlesex|
+----------+--------------------+



In [59]:
most_connectid = most_connected.collect()

25/03/26 18:02:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 18:02:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 18:02:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 18:02:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 18:02:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 18:02:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 18:02:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 18:02:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/03/26 18:02:31 WARN RowBasedKeyValueBatch: Calling spill() on

In [60]:
most_connectid

[Row(Id='B000TKO3EA', Title='middlesex [unabridged audiobook]'),
 Row(Id='B000HKLROQ', Title='middlesex')]