<a href="https://colab.research.google.com/github/ssssarett/similar-book-reviews/blob/main/Copia_di_PROGETTO.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Project 1: Finding similar items**
The task is to implement a detector of pairs of similar book reviews.
You can choose how to encode reviews and how to measure their similarity, although a simple choice would be that of processing the review/text column of the Books_rating.csv file, using
the Jaccard similarity.

# 0. SETUP & LIBRARIES

In [None]:
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq
!pip install pyspark --quiet
!pip install -U sentence-transformers

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [76]:
# LIBRARIES
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, size, length, rand, monotonically_increasing_id
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import HashingTF, MinHashLSH, Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
import os, re, shutil, glob
import pandas as pd
from google.colab import files

In [118]:
# PARAMETERS
SUBSAMPLE_SIZE = 1000
sim_threshold = 0.2
join_limit = 10000
k = 5
numFeatures = 1 << 18
numHashTables = 5
duplicate_threshold = 0.75
semantic_threshold = 0.3

USE_FULL_DATASET = False
FOCUS_BOOK_TITLE = None
MIN_REVIEW_LENGTH = 20
MIN_RATING = None


# 1. KAGGLE IMPORT
🚨 in the final version of the project the "xxxxxx" strings should be re-introduced in order to not share sensitive
information.

In [None]:
os.environ['KAGGLE_USERNAME'] = "ssarett" # "xxxxxx"
os.environ['KAGGLE_KEY'] = "f255453d448627d02504a9a8af388057" # "xxxxxx"

# dataset Amazon Books Reviews
!kaggle datasets download -d mohamedbakhet/amazon-books-reviews

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
Downloading amazon-books-reviews.zip to /content
 99% 1.05G/1.06G [00:07<00:00, 96.9MB/s]
100% 1.06G/1.06G [00:07<00:00, 144MB/s] 


In [None]:
# unzip files in directory "data"
!unzip amazon-books-reviews.zip -d ./data

Archive:  amazon-books-reviews.zip
  inflating: ./data/Books_rating.csv  
  inflating: ./data/books_data.csv   


In [97]:
# check
!ls -lh ./data

total 2.9G
-rw-r--r-- 1 root root 173M Sep 13  2022 books_data.csv
-rw-r--r-- 1 root root 2.7G Sep 13  2022 Books_rating.csv


# 2. SPARKSESSION

In [98]:
spark = SparkSession.builder \
    .appName("SimilarReviews") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [94]:
spark.catalog.clearCache()

# 3. READ DATA & BASIC FILTERING

In [99]:
df = spark.read.csv("data/Books_rating.csv", header=True, inferSchema=True) \
         .withColumnRenamed("review/text", "review_text") \
         .withColumnRenamed("review/score", "rating")

In [100]:
df = df.filter(col("review_text").isNotNull()) \
       .filter(length("review_text") > MIN_REVIEW_LENGTH)
if MIN_RATING:
    df = df.filter(col("rating") >= MIN_RATING)
if FOCUS_BOOK_TITLE:
    df = df.filter(col("book_title").like(f"%{FOCUS_BOOK_TITLE}%"))
if not USE_FULL_DATASET:
    df = df.orderBy(rand(seed=42)).limit(SUBSAMPLE_SIZE)

In [None]:
df.head()

Row(bookId='1891661426', Title='How to Win Lotteries, Sweepstakes, and Contests in the 21st Century', Price='10.91', User_id='A2N5NEEIK9GUGJ', profileName='Venda L. Miller', review/helpfulness='1/1', rating='2.0', review/time='1302048000', review/summary='How to Win Lotteries, Sweepstakes and Contest', review_text="I was looking for information that isn't common knowledge or common sense. Everything in this book most of the population currently know. He did not tell me anything that I didn't already knew. Great and catchy Front cover.")

In [82]:
df.count()

10000

# 4. PREPROCESSING

In [101]:
# clean text: punctuation, lowercase, normalize whitespace
def clean_text(text):
    if text is None:
        return ""
    t = text.lower()
    t = re.sub(r"[^\w\s]", "", t)
    return re.sub(r"\s+", " ", t).strip()

clean_udf = udf(clean_text, StringType())
df = df.withColumn("cleaned", clean_udf(col("review_text")))

# 5. SHINGLING

In [102]:
def char_shingles(text, k=k):
    if text is None or len(text) < k:
        return []
    return list({ text[i:i+k] for i in range(len(text)-k+1) })
shingle_udf = udf(char_shingles, ArrayType(StringType()))
df = df.withColumn("shingles", shingle_udf(col("cleaned"))) \
       .withColumn("id", monotonically_increasing_id()) \
       .filter(size(col("shingles")) > 0)

## FEATURE VECTORIZATION: HashingTF

In [103]:
htf = HashingTF(inputCol="shingles", outputCol="features", numFeatures=numFeatures)
df = htf.transform(df)
df.cache()

DataFrame[id: bigint, Title: string, Price: string, User_id: string, profileName: string, review/helpfulness: string, rating: string, review/time: string, review/summary: string, review_text: string, cleaned: string, shingles: array<string>, features: vector]

# 6. LSH & JACCARD

In [104]:
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=numHashTables)
model = mh.fit(df)

dist_threshold = 1.0 - sim_threshold
pairs = model.approxSimilarityJoin(df, df, dist_threshold, distCol="jaccardDistance")
if join_limit:
    pairs = pairs.limit(join_limit)

In [105]:
jaccard_results = pairs \
    .filter(col("datasetA.id") < col("datasetB.id")) \
    .withColumn("jaccardSimilarity", 1 - col("jaccardDistance")) \
    .filter(col("jaccardSimilarity") <= duplicate_threshold) \
    .select(
        col("datasetA.id").alias("id1"),
        col("datasetB.id").alias("id2"),
        col("datasetA.review_text").alias("review1"),
        col("datasetB.review_text").alias("review2"),
        col("datasetA.rating").alias("rating1"),
        col("datasetB.rating").alias("rating2"),
        col("jaccardSimilarity")
    )

# 7. TF-IDF & COSINE

In [119]:
tokenizer = Tokenizer(inputCol="cleaned", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
cv = CountVectorizer(inputCol="filtered", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="tfidfFeatures")

In [120]:
pipeline = Pipeline(stages=[tokenizer, remover, cv, idf])
tfidf_model = pipeline.fit(df)
df_tfidf = tfidf_model.transform(df)
cosine_udf = udf(lambda v1, v2: float(v1.dot(v2)/(Vectors.norm(v1,2)*Vectors.norm(v2,2))), "double")

In [124]:
df1 = df_tfidf.select(col("id").alias("id1"), col("tfidfFeatures").alias("v1"))
df2 = df_tfidf.select(col("id").alias("id2"), col("tfidfFeatures").alias("v2"))
jacc_pairs = jaccard_results.select("id1","id2")
df1_sub = df1.join(jacc_pairs, on="id1")
joined = df1_sub.join(df2, on="id2")

In [125]:
final_results = joined.withColumn("cosine_similarity", cosine_udf(col("v1"), col("v2"))) \
    .filter(col("cosine_similarity") >= semantic_threshold) \
    .filter(col("cosine_similarity") <= duplicate_threshold)

merged_results = final_results \
    .join(jaccard_results.select("id1", "id2", "review1", "review2", "jaccardSimilarity"), on=["id1", "id2"], how="inner") \
    .select("id1", "id2", "review1", "review2", "jaccardSimilarity", "cosine_similarity")

# 8. RESULTS PREVIEW AND EXPORT

In [112]:
print("Coppie totali generate da LSH (prima del filtro):", pairs.count())

Coppie totali generate da LSH (prima del filtro): 1018


In [111]:
print("Totale coppie trovate da LSH (prima del filtro Cosine):", jaccard_results.count())

Totale coppie trovate da LSH (prima del filtro Cosine): 8


In [110]:
jaccard_results.show(truncate=False)

+---+---+------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------+-------+-------+-------------------+
|id1|id2|review1                                                                                                                       |review2                                                                                                                 |rating1|rating2|jaccardSimilarity  |
+---+---+------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------+-------+-------+-------------------+
|808|871|I received the book in a timely manner and in the condition described, thank you.                            

In [72]:
jaccard_results_pd = jaccard_results.toPandas()
jaccard_results_pd.to_html("jaccard_results.html", index=False)
files.download("jaccard_results.html")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [127]:
merged_results.show(truncate=False)

+---+---+------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------+-------------------+-------------------+
|id1|id2|review1                                                                                                                       |review2                                                                                                                 |jaccardSimilarity  |cosine_similarity  |
+---+---+------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------+-------------------+-------------------+
|173|709|Like before...I am pleased to say your product was sent in a timely fashion and in very good cond

In [128]:
merged_results_pd = merged_results.toPandas()
merged_results_pd.to_html("merged_results.html", index=False)
files.download("merged_results.html")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# 11. STOP SPARK SESSION

In [95]:
spark.stop()