### Test

In [24]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("TestSpark") \
    .getOrCreate()

df = spark.range(10)
df.show()


+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



## Exercise 1 : Inspect and clean the Raw Data

In [25]:
# import the modules, and create the SparkSession

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("BookCF_Project") \
    .getOrCreate()


In [26]:
# import Book.cvs
file_path = "data/Books.csv"  
df_raw = spark.read.csv(file_path, header=True, inferSchema=True)

# check the schema
df_raw.printSchema()

# get the number of total rows
print("Raw row count:", df_raw.count())


root
 |-- _c0: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- age: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- book_title: string (nullable = true)
 |-- book_author: string (nullable = true)
 |-- year_of_publication: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- img_s: string (nullable = true)
 |-- img_m: string (nullable = true)
 |-- img_l: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)

Raw row count: 515339


1. Clean `ISBN` 

In [27]:

# only keep figures
df1 = df_raw.withColumn(
    "isbn_clean",
    F.regexp_replace(F.col("isbn"), "[^0-9]", "")
)

# drop the blank cells
df1 = df1.filter(F.length(F.col("isbn_clean")) > 0)

# count non-empty ISBN
isbn_counts = df1.groupBy("isbn_clean").agg(F.count("*").alias("isbn_count"))
isbn_counts.orderBy(F.col("isbn_count").desc()).show(10)

# keep ISBN which appears >= 2 times
isbn_keep = isbn_counts.filter(F.col("isbn_count") >= 2).select("isbn_clean")

df1 = df1.join(isbn_keep, on="isbn_clean", how="inner")

# update
df1 = df1.drop("isbn").withColumnRenamed("isbn_clean", "isbn")

print("After ISBN cleaning + freq>=2, rows:", df1.count())


+----------+----------+
|isbn_clean|isbn_count|
+----------+----------+
| 971880107|      2501|
| 316666343|      1295|
| 385504209|       883|
|  60928336|       732|
| 312195516|       723|
| 044023722|       647|
| 142001740|       615|
| 067976402|       614|
| 671027360|       586|
| 446672211|       585|
+----------+----------+
only showing top 10 rows

After ISBN cleaning + freq>=2, rows: 101786


2. We selected these columns `user_id`, `isbn`, `rating`, `book_title`, `book_author`, `year_of_poblication`, `publisher`, `Summary`, `Language`, `Category` to analyze and dropped the remaining columns, including demographics (`location`, `age`, `state`, `country`).

In [28]:
# drop the index row
df1 = df1.drop("_c0")

ratings_raw = df1.select(
    "user_id",
    "isbn",
    "rating",
    "book_title",
    "book_author",
    "year_of_publication",
    "publisher",
    "Summary",
    "Language",
    "Category"
)

ratings_raw.printSchema()


root
 |-- user_id: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- book_title: string (nullable = true)
 |-- book_author: string (nullable = true)
 |-- year_of_publication: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- Category: string (nullable = true)



3. Clean `rating`:  
**Step 5 in the PDF given by prof <- moved earlier to ensure data validity**
- string -> float[0,10]
- remove invaild ratings
- keep the rating 0, but it will be treated as "no rating" in the collaborative filtering stage

In [29]:
# string -> float[0,10]
ratings_num = ratings_raw.withColumn(
    "rating",
    F.col("rating").cast("float")
)

# remove invaild ratings
ratings_valid = ratings_num.filter(
    (F.col("rating").isNotNull()) &
    (F.col("rating") >= 0.0) &
    (F.col("rating") <= 10.0)
)

print("Original row number:", ratings_raw.count())
print("Row number with valided rating:", ratings_valid.count())

ratings_valid.groupBy("rating").count().orderBy("rating").show(30)


Original row number: 101786
Row number with valided rating: 101562
+------+-----+
|rating|count|
+------+-----+
|   0.0|59056|
|   1.0|  200|
|   2.0|  296|
|   3.0|  591|
|   4.0|  771|
|   5.0| 3716|
|   6.0| 3077|
|   7.0| 6833|
|   8.0|10632|
|   9.0| 7918|
|  10.0| 8472|
+------+-----+



4. Count the number of ratings for each user and only retain active users who give >= 5 ratings

Users with fewer than 5 ratings are removed because Pearson correlation requires enough observations to be meaningful.

In [30]:
user_activity = ratings_valid.groupBy("user_id").agg(F.count("*").alias("num_ratings"))

user_activity.orderBy(F.col("num_ratings").desc()).show(10)

users_active = user_activity.filter(F.col("num_ratings") >= 5)

print("Active user number:", users_active.count())

ratings_active = ratings_valid.join(
    users_active.select("user_id"),
    on="user_id",
    how="inner"
)

print("Total rating number after retaining only active users:", ratings_active.count())



+-------+-----------+
|user_id|num_ratings|
+-------+-----------+
|  11676|        664|
|    254|        280|
|  35859|        255|
|  16795|        222|
| 153662|        208|
|  76352|        203|
| 230522|        202|
|  60244|        196|
|  55492|        187|
| 204864|        184|
+-------+-----------+
only showing top 10 rows

Active user number: 3876
Total rating number after retaining only active users: 66703


5. Only retain users whose rated books overlap with others by at least one.

In [31]:
# Only reamain books rated by 2 or more users 
book_user_counts = ratings_active.groupBy("isbn") \
    .agg(F.countDistinct("user_id").alias("num_users_for_book"))

overlap_books = book_user_counts.filter(F.col("num_users_for_book") >= 2) \
    .select("isbn")

# All users related these books
ratings_on_overlap_books = ratings_active.join(overlap_books, on="isbn", how="inner")
overlap_users = ratings_on_overlap_books.select("user_id").distinct()

print("Users with at least one overlapping book:", overlap_users.count())

# In the original active rating table, only retain the ratings of overlap_users 
ratings_overlap_users = ratings_active.join(
    overlap_users,
    on="user_id",
    how="inner"
)

print("Rows after removing isolated users:", ratings_overlap_users.count())

ratings_overlap_users.show(5)


Users with at least one overlapping book: 3876
Rows after removing isolated users: 66703
+-------+-------+------+------------+--------------------+-------------------+--------------------+--------------------+--------+--------+
|user_id|   isbn|rating|  book_title|         book_author|year_of_publication|           publisher|             Summary|Language|Category|
+-------+-------+------+------------+--------------------+-------------------+--------------------+--------------------+--------+--------+
|      8|2005018|   5.0|Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|In a small town i...|    NULL|    NULL|
|  11400|2005018|   0.0|Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|In a small town i...|    NULL|    NULL|
|  11676|2005018|   8.0|Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|In a small town i...|    NULL|    NULL|
|  85526|2005018|   0.0|Clara Callan|Richard Bruce Wright|               2001

6. Filter out books: 
- Each book must have >= 5 ratings
- Remove books with extreme high/low rating given by few people

Books with fewer than 5 ratings are removed because they do not provide enough data to compute reliable correlations with other books or users so that the similarity estimates will be more robust

In [32]:
book_unusual = ratings_overlap_users.groupBy("isbn") \
    .agg(
        F.count("*").alias("num_ratings"),
        F.avg("rating").alias("avg_rating")
    )

suspicious_books = (
    (F.col("num_ratings") <= 5) &
    (
        (F.col("avg_rating") >= 9.5) |
        (F.col("avg_rating") <= 1.0)
    )
)

books_keep = book_unusual.filter(
    (F.col("num_ratings") >= 5) &  
    ~suspicious_books               
).select("isbn")

print("Books kept after filering:",
      books_keep.count())

ratings_filtered_books = ratings_overlap_users.join(
    books_keep,
    on="isbn",
    how="inner"
)

print("Rows after filtering unpopular / extreme books:",
      ratings_filtered_books.count())

ratings_filtered_books.show(5)

Books kept after filering: 1247
Rows after filtering unpopular / extreme books: 65292
+-------+-------+------+------------+--------------------+-------------------+--------------------+--------------------+--------+--------+
|   isbn|user_id|rating|  book_title|         book_author|year_of_publication|           publisher|             Summary|Language|Category|
+-------+-------+------+------------+--------------------+-------------------+--------------------+--------------------+--------+--------+
|2005018|      8|   5.0|Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|In a small town i...|    NULL|    NULL|
|2005018|  11400|   0.0|Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|In a small town i...|    NULL|    NULL|
|2005018|  11676|   8.0|Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|In a small town i...|    NULL|    NULL|
|2005018|  85526|   0.0|Clara Callan|Richard Bruce Wright|               2001|Ha

7. Remove duplicate books

To remove duplicate books, we group editions by normalized (`title`,`author`) and select a canonical ISBN per group. All ratings for other editions are mapped to this canonical ISBN.


In [33]:
# Normalize book_title and book_author
ratings_norm = ratings_filtered_books.withColumn(
    "title_norm",
    F.lower(F.regexp_replace(F.col("book_title"), r"\s+", " "))
).withColumn(
    "author_norm",
    F.lower(F.regexp_replace(F.col("book_author"), r"\s+", " "))
)

# Count the number of ratings for each ISBN within every (title_norm, author_norm, isbn) group.
book_isbn_stats = ratings_norm.groupBy(
    "title_norm", "author_norm", "isbn"
).agg(
    F.count("*").alias("num_ratings_for_isbn")
)

# For each (title_norm, author_norm) group, select the ISBN with the largest number of ratings as the canonical ISBN.
w = Window.partitionBy("title_norm", "author_norm") \
          .orderBy(F.col("num_ratings_for_isbn").desc(), F.col("isbn"))

can_isbn_per_book = book_isbn_stats.withColumn(
    "rank_in_group",
    F.row_number().over(w)
).filter(
    F.col("rank_in_group") == 1
).select(
    "title_norm", "author_norm", F.col("isbn").alias("isbn_can")
)

# Create mapping table
isbn_with_norm = book_isbn_stats.select("title_norm", "author_norm", "isbn")

edition_to_rep = isbn_with_norm.join(
    can_isbn_per_book,
    on=["title_norm", "author_norm"],
    how="left"
).select(
    "isbn", "isbn_can"
)

# Replace each edition’s ISBN in the ratings table with its corresponding canonical ISBN
ratings_mapped = ratings_norm.join(
    edition_to_rep,
    on="isbn",
    how="left"
).withColumn(
    "isbn_final",
    F.coalesce(F.col("isbn_can"), F.col("isbn"))  
)

# In the ratings table, a single user may have rated multiple editions of the same logical book. 
# Therefore, we aggregate the data by (user_id, isbn_final) to ensure that each user–book pair has only one rating.
ratings_rep_books = ratings_mapped.groupBy(
    "user_id", "isbn_final"
).agg(
    F.avg("rating").alias("rating"),               
    F.count("*").alias("num_times"),               
    F.first("book_title").alias("book_title"),
    F.first("book_author").alias("book_author"),
    F.first("year_of_publication").alias("year_of_publication"),
    F.first("publisher").alias("publisher"),
    F.first("Summary").alias("Summary"),
    F.first("Language").alias("Language"),
    F.first("Category").alias("Category")
).withColumnRenamed(
    "isbn_final", "isbn"   
)

print("Row number before merging editions:", ratings_mapped.count())
print("Row number after merging editions to canonical ISBNs:", ratings_mapped.count())
ratings_mapped.show(5, truncate=False)

Row number before merging editions: 65292
Row number after merging editions to canonical ISBNs: 65292
+---------+-------+------+---------------+-----------+-------------------+------------------------+---------------------------------------------------------------------+--------+--------+---------------+-----------+---------+----------+
|isbn     |user_id|rating|book_title     |book_author|year_of_publication|publisher               |Summary                                                              |Language|Category|title_norm     |author_norm|isbn_can |isbn_final|
+---------+-------+------+---------------+-----------+-------------------+------------------------+---------------------------------------------------------------------+--------+--------+---------------+-----------+---------+----------+
|042518630|8067   |2.0   |Purity in Death|J.D. Robb  |2002               |Berkley Publishing Group|Eve Dallas must face the impossible: someone has unleashed a computer|NULL    |NULL    |

8. Merge duplicates: Only one rating is retained for each user and each book

Users may have rated the same book several times, for exemple, rating the same book which has different versions twice.
If a user rated the same book twice, keep the highest rating. We keep one copy of book metadata (using `.first()`) and book metadata will be cleaned and reconstructed later.

In [34]:
ratings_merged = ratings_mapped.groupBy(
    "user_id", "isbn_final"
).agg(
    F.max("rating").alias("rating"),               
    F.count("*").alias("num_times"),               
    F.first("book_title").alias("book_title"),
    F.first("book_author").alias("book_author"),
    F.first("year_of_publication").alias("year_of_publication"),
    F.first("publisher").alias("publisher"),
    F.first("Summary").alias("Summary"),
    F.first("Language").alias("Language"),
    F.first("Category").alias("Category")
).withColumnRenamed(
    "isbn_final", "isbn"   
)

print("Row number before deduplication:", ratings_mapped.count())
print("Row number after deduplication:", ratings_merged.count())
ratings_merged.show(5, truncate=False)


Row number before deduplication: 65292
Row number after deduplication: 65056
+-------+---------+------+---------+---------------------------------------------------------------------------------------------+-----------------+-------------------+----------------+----------------------------------------------------------------------+--------+--------+
|user_id|isbn     |rating|num_times|book_title                                                                                   |book_author      |year_of_publication|publisher       |Summary                                                               |Language|Category|
+-------+---------+------+---------+---------------------------------------------------------------------------------------------+-----------------+-------------------+----------------+----------------------------------------------------------------------+--------+--------+
|100009 |385504209|8.0   |1        |The Da Vinci Code                                             

We first materialize an intermediate cleaned dataset (`ratings_merged`) before running k-core. This "cut" breaks the long Spark lineage (raw → cleaning → ISBN normalization → deduplication), so the later k-core iterations run on a compact, stable table instead of recomputing the whole pipeline every time.

Due to stability and filesystem limitations of Spark on Windows (Hadoop + `winutils.exe` + local permissions), writing intermediate results directly with `df.write.parquet(...)` was unreliable in my environment. As a workaround, we used `toPandas()` **only to export** the already cleaned Spark DataFrame (`ratings_merged_for_core`) to disk (Parquet), and then reloaded this file back into Spark in a fresh session for the k-core iterations and all downstream processing. 

Pandas is **not used for any filtering, aggregation, or modeling steps**; it serves purely as a lightweight I/O bridge to materialize a stable checkpoint between two Spark stages.

Then we apply an iterative 5-core on the user–item graph (users and books as nodes, ratings as edges), shown in data_cleaning_02.ipynb. In practice, this means repeatedly removing users with fewer than 5 ratings and books with fewer than 5 ratings (and extreme low-support items). The result is a denser, more reliable interaction subgraph where both users and books have enough data to compute meaningful similarities and train recommendation models.


[x] 1. if all `ratings` are within the range [0,10]

In [None]:

# Cutting the ligeage and exporting the cleaned core ratings 

ratings_merged_for_core = ratings_merged.select(
    "user_id",
    "isbn",
    "rating",
    "book_title",
    "book_author",
    "year_of_publication",
    "publisher",
    "Summary",
    "Language",
    "Category"
)

pdf_core = ratings_merged_for_core.toPandas()
print(pdf_core.shape)

import os
import sys

if getattr(sys, 'frozen', False):  
    base_dir = os.path.dirname(sys.executable)
else:  
    base_dir = os.path.dirname(os.path.abspath(__file__))

out_dir = os.path.join(base_dir, "export_core")
os.makedirs(out_dir, exist_ok=True)

output_path = os.path.join(out_dir, "ratings_merged_for_core.parquet")
print("Saving to:", output_path)

pdf_core.to_parquet(output_path, index=False)








(65056, 10)
Saving to: D:\projet_esilv\Mining\export_core\ratings_merged_for_core.parquet
