<a href="https://colab.research.google.com/github/michelecoaro/SimItems/blob/main/SimItems.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
'''
installing libraries that are lacking in colab, only run this the first time
'''
%pip install --quiet pyspark==3.3.2 kaggle nltk pandas matplotlib
!pip install cloudpickle==2.2.1


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m12.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
dataproc-spark-connect 0.8.2 requires pyspark[connect]~=3.5.1, but you have pyspark 3.3.2 which is incompatible.[0m[31m
[0mCollecting cloudpickle==2.2.1
  Downloading cloudpickle-2.2.1-py3-none-any.whl.metadata (6.9 kB)
Downloading cloudpickle-2.2.1-py3-none-any.whl (25 kB)
Installing collected packages: cloudpickle
  Attempting uninstall: cloudpickle
    Found existing installation: cloudpickle 3.1.1
    Uninstalling cloudpickle-3.1.1:
      Su

In [2]:
'''
Imports
'''

import os
from pathlib import Path
import shutil
import subprocess
import sys
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, MinHashLSH
from pyspark.sql.functions import col, size
from pyspark.ml.functions import vector_to_array


In [3]:
'''
Starting the project off by defining all the constants, credentials and parameters that will be used throughout the notebook.

'''
DATA_DIR = Path("./data")
DATA_DIR.mkdir(exist_ok=True)

KAGGLE_USERNAME = "XXXXXXXXXXXXX" #"Insert your username here"
KAGGLE_KEY = "XXXXXXXXXXXXX" #"Insert your key here"

SUBSAMPLE = True
N_SAMPLE = 3000 #500000 is the amount used to replicate the results portrayed in the report, dropped to 3000 due to Colab lack of resources

HASH_BUCKETS = 2**20
LSH_TABLES = 5
JACCARD_THRESHOLD = 0.2
RANDOM_SEED = 42


In [4]:
'''
Downloading the dataset, ensuring that possible errors are handled.
Installing Kaggle if it is missing.
Cleaning data directory in case a bad download happened
Finally veryfiyng CSV presence
'''

os.environ["KAGGLE_USERNAME"] = KAGGLE_USERNAME
os.environ["KAGGLE_KEY"] = KAGGLE_KEY

try:
    import kaggle
except ImportError:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "kaggle"])
    import kaggle

dataset_slug = "mohamedbakhet/amazon-books-reviews"

bad_zip = DATA_DIR / "amazon-books-reviews.zip"
if bad_zip.exists():
    bad_zip.unlink()

subprocess.check_call(
    [
        "kaggle", "datasets", "download",
        "-d", dataset_slug,
        "-p", str(DATA_DIR),
        "--unzip",
        "--force",
        "--quiet",
    ]
)

csv_files = list(DATA_DIR.glob("*.csv"))
if not csv_files:
    raise FileNotFoundError("Download finished but no CSV files were found in ./data.")

print("Files in ./data:")
for p in csv_files:
    print(" •", p.name)


Files in ./data:
 • books_data.csv
 • Books_rating.csv


In [5]:
'''
Creating or retrieveing a functioning Spark session
'''

spark = (
    SparkSession.builder
    .appName("AmazonReviewSimilarity")
    .getOrCreate()
)

print("Spark version:", spark.version)


Spark version: 3.3.2


In [6]:
'''
Loading and preprocessing the dataset, by renaming columns and filtering out invalid reviews. Finishing with subsampling if requested.
'''


csv_path = Path("./data/Books_rating.csv")
if not csv_path.exists():
    raise FileNotFoundError(
        f"{csv_path} not found.\n"
        f"CSV files in ./data: {[p.name for p in Path('./data').glob('*.csv')]}"
    )

print("Loading", csv_path.name)

raw_df = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(str(csv_path))
)

print("Total rows (incl. null text):", raw_df.count())

rename_map = {
    "review/text": "review_text",
    "review/summary": "review_summary",
}
for old, new in rename_map.items():
    if old in raw_df.columns:
        raw_df = raw_df.withColumnRenamed(old, new)

df = (
    raw_df
    .filter(F.col("review_text").isNotNull() & (F.length("review_text") > 0))
    .select(
        F.col("Id").cast("string").alias("review_id"),
        "review_text"
    )
)

if SUBSAMPLE:
    df = df.sample(False, 1.0, seed=RANDOM_SEED).limit(N_SAMPLE)

print("Working set size:", df.count())
df.show(5, truncate=100)


Loading Books_rating.csv
Total rows (incl. null text): 3000000
Working set size: 3000
+----------+----------------------------------------------------------------------------------------------------+
| review_id|                                                                                         review_text|
+----------+----------------------------------------------------------------------------------------------------+
|1882931173|This is only for Julie Strain fans. It's a collection of her photos -- about 80 pages worth with ...|
|0826414346|I don't care much for Dr. Seuss but after reading Philip Nel's book I changed my mind--that's a g...|
|0826414346|                       "If people become the books they read and if ""the child is father to the man|
|0826414346|Theodore Seuss Geisel (1904-1991), aka &quot;Dr. Seuss,&quot; was one of the most influential wri...|
|0826414346|"Philip Nel - Dr. Seuss: American IconThis is basically an academic overview of Seuss poetry, art...|
+-

In [7]:
'''
Defining the pipeline
'''
tokenizer = RegexTokenizer(
    inputCol="review_text",
    outputCol="tokens",
    pattern="\\W+"
)
remover = StopWordsRemover(
    inputCol="tokens",
    outputCol="filtered_tokens"
)
tf = HashingTF(
    inputCol="filtered_tokens",
    outputCol="features",
    numFeatures=HASH_BUCKETS,
    binary=True
)
lsh = MinHashLSH(
    inputCol="features",
    outputCol="hashes",
    numHashTables=LSH_TABLES
)

pipeline = Pipeline(stages=[tokenizer, remover, tf, lsh])


model = pipeline.fit(df)
tmp = model.transform(df).select("review_id", "features")


df_hashed = (
    tmp
      .withColumn("arr", vector_to_array("features"))
      .filter(  # keep only if any element != 0
          F.expr("exists(arr, x -> x != 0)")
      )
      .drop("arr")
      .cache()
)

print("Rows kept after removing zero‐vectors:", df_hashed.count())
df_hashed.show(3, truncate=False)

Rows kept after removing zero‐vectors: 3000
+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Cell 8: Identification of similiar pairs through LSH

In [8]:
'''
Identifying similar pairs through LSH
'''

lsh_model = model.stages[-1]

similar_pairs = (
    lsh_model.approxSimilarityJoin(
        df_hashed, df_hashed, JACCARD_THRESHOLD, distCol="jaccard_distance"
    )
    .filter(F.col("datasetA.review_id") < F.col("datasetB.review_id"))
    .select(
        F.col("datasetA.review_id").alias("review_id_1"),
        F.col("datasetB.review_id").alias("review_id_2"),
        "jaccard_distance",
    )
    .distinct()
)

print(
    f"Candidate duplicate pairs (distance ≤ {JACCARD_THRESHOLD}): "
    f"{similar_pairs.count()}"
)
similar_pairs.show(10, truncate=False)


Candidate duplicate pairs (distance ≤ 0.2): 0
+-----------+-----------+----------------+
|review_id_1|review_id_2|jaccard_distance|
+-----------+-----------+----------------+
+-----------+-----------+----------------+



Cell 9: Injecting syntethic results into original dataset

In [10]:
'''
Injecting duplicates into the original dataset
'''
dup_source = df.orderBy(F.rand(seed=RANDOM_SEED)).limit(50) #Used 250 for the report
duplicated = dup_source.withColumn(
    "review_id", F.concat(F.lit("dup_"), F.col("review_id"))
)
df_augmented = df.unionByName(duplicated)
print("Augmented dataset size:", df_augmented.count())

model_aug = pipeline.fit(df_augmented)

df_aug_hashed = (
    model_aug
      .transform(df_augmented)
      .select("review_id", "features")
      # explode the SparseVector into Array<Double>
      .withColumn("arr", vector_to_array("features"))
      # keep only rows where at least one entry != 0
      .filter(F.expr("exists(arr, x -> x != 0)"))
      .drop("arr")
      .cache()
)

aug_lsh_model = model_aug.stages[-1]
pred_pairs = (
    aug_lsh_model
      .approxSimilarityJoin(
          df_aug_hashed, df_aug_hashed,
          threshold=JACCARD_THRESHOLD,
          distCol="jaccard_distance"
      )
      .filter(F.col("datasetA.review_id") < F.col("datasetB.review_id"))
      .select(
        F.col("datasetA.review_id").alias("id1"),
        F.col("datasetB.review_id").alias("id2")
      )
      .distinct()
      .cache()
)

truth = dup_source.select(
    F.col("review_id").alias("id1"),
    F.concat(F.lit("dup_"), F.col("review_id")).alias("id2")
)

tp        = pred_pairs.join(truth, ["id1","id2"]).count()
pred_cnt  = pred_pairs.count()
truth_cnt = truth.count()

precision = tp / pred_cnt if pred_cnt else 0.0
recall    = tp / truth_cnt if truth_cnt else 0.0

print(f"Ground-truth duplicate pairs : {truth_cnt}")
print(f"Correctly predicted (TP)      : {tp}")
print(f"Precision                     : {precision:.3f}")
print(f"Recall                        : {recall:.3f}")

Augmented dataset size: 3050
Ground-truth duplicate pairs : 50
Correctly predicted (TP)      : 50
Precision                     : 1.786
Recall                        : 1.000


Cell 10: Examples with duplicate review

In [11]:
'''
Example cell with duplicates
'''
similar_pairs_with_text = (
    similar_pairs
    .join(df.select(F.col("review_id").alias("review_id_1"), F.col("review_text").alias("review_text_1")), on="review_id_1")
    .join(df.select(F.col("review_id").alias("review_id_2"), F.col("review_text").alias("review_text_2")), on="review_id_2")
    .select("review_id_1", "review_id_2", "jaccard_distance", "review_text_1", "review_text_2")
)

similar_pairs_with_text.show(5, truncate=200)


+-----------+-----------+----------------+-------------+-------------+
|review_id_1|review_id_2|jaccard_distance|review_text_1|review_text_2|
+-----------+-----------+----------------+-------------+-------------+
+-----------+-----------+----------------+-------------+-------------+

