In [None]:
# We import the needed libraries
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.types import StringType, BooleanType
from pyspark.ml.feature import (
    Tokenizer,
    StopWordsRemover,
    HashingTF,
    MinHashLSH
)
import nltk
from nltk.corpus import stopwords
from tqdm import tqdm
import psutil
import zipfile
import os, re, time, math, pickle
from multiprocessing import Pool, cpu_count
from itertools import combinations

In [None]:

os.environ['KAGGLE_USERNAME'] = "xxxxxxxxxx"
os.environ['KAGGLE_KEY'] = "xxxxxxxxxx"
!kaggle datasets download -d'mohamedbakhet/amazon-books-reviews'

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
amazon-books-reviews.zip: Skipping, found more recently modified local copy (use --force to force download)


In [None]:
# Extract the zip file to the current working directory (i.e., the root directory in Colab)
with zipfile.ZipFile('amazon-books-reviews.zip', 'r') as zip_ref:
    zip_ref.extractall()  # Extract to the current directory

In [None]:
def jaccard(a, b):
    sa, sb = set(a), set(b)
    return float(len(sa & sb) / len(sa | sb)) if sa and sb else 0.0

def tokenize(text: str):
    """Clean and tokenize review text, removing stopwords."""
    if pd.isna(text): return []
    return [t for t in TOKEN_RE.findall(text.lower()) if t not in STOP_WORDS]

In [None]:
# We test the Brute force Jaccard

res = {
    'brute': []}
SUBSAMPLE_SIZE     = 10000
top_k= 10
file_path = r"Books_rating.csv"
proc = psutil.Process(os.getpid())
TOKEN_RE = re.compile(r"\b[a-z]{2,}\b")
STOP_WORDS = set(stopwords.words('english'))


start = time.time(); mem0 = proc.memory_info().rss/(1024**2)


df_b = pd.read_csv(file_path, usecols=['review/text'], nrows=SUBSAMPLE_SIZE)
df_b['tokens'] = df_b['review/text'].fillna('').map(tokenize)


# compute all pairs and scores
pairs = combinations(df_b.index, 2)
scores = [(i, j, jaccard(df_b.loc[i,'tokens'], df_b.loc[j,'tokens']))
          for i,j in tqdm(pairs)]
top = sorted(scores, key=lambda x: -x[2])[:top_k]
res['brute'] = [(i,j,s) for i,j,s in top]
elapsed_time = time.time() - start; mem1 = proc.memory_info().rss/(1024**2)

print("Brute-force top-10:", res['brute'])

print(f"Resources Used: {elapsed_time:.2f}s, Δmem: {mem1-mem0:.1f}MiB")


49995000it [29:20, 28394.54it/s]


Brute-force top-10: [(253, 256, 1.0), (262, 267, 1.0), (422, 423, 1.0), (428, 429, 1.0), (535, 536, 1.0), (596, 597, 1.0), (724, 790, 1.0), (726, 727, 1.0), (776, 784, 1.0), (884, 889, 1.0)]
Resources Used: 1791.96s, Δmem: 5046.8MiB


In [None]:
#!pip3 install pyspark
import pyspark
from pyspark.sql import SparkSession

import pyspark.sql.functions as f
from pyspark.sql import Window

spark = SparkSession.builder \
                    .appName("AMD Project") \
                    .getOrCreate()

In [None]:

start = time.time(); mem0 = proc.memory_info().rss/(1024**2)
# 1) We initiate a spark session
spark = (
    SparkSession.builder
    .appName("MinHashSimilarReviews")
    .config("spark.driver.memory",  "8g")
    .config("spark.executor.memory","8g")
    .getOrCreate()
)

# 2) Load & clean

df = (
    spark.read
         .csv(file_path, header=True, inferSchema=True)
         .withColumnRenamed("review/text", "text")
         .select("text")
         .filter(col("text").isNotNull() & (col("text") != ""))
         .withColumn("doc_id", monotonically_increasing_id())
)



In [None]:
# 3) Preprocessing: we treat lowercase & strip punctuation
clean_udf = udf(lambda s: re.sub(r"[^\w\s]", "", s.lower()) if s else "", StringType())
df = df.withColumn("clean_text", clean_udf(col("text")))

In [None]:
# 4) We tokenize
df = Tokenizer(inputCol="clean_text", outputCol="tokens").transform(df)

In [None]:
# 5) Remove stop-words
df = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens").transform(df)

In [None]:
# 6) We vectorize with HashingTF (no fit needed)
df = HashingTF(
    inputCol="filtered_tokens",
    outputCol="features",
    numFeatures=1 << 16
).transform(df)



In [None]:
# 7) We drop any all-zero vectors (MinHashLSH needs at least one non-zero)
hasNZ = udf(lambda v: v.numNonzeros() > 0, BooleanType())
df = df.filter(hasNZ("features")).cache()

In [None]:
# 8) We build & apply MinHashLSH
mh = MinHashLSH(
    inputCol="features",
    outputCol="hashes",
    numHashTables=3,
    seed=42
)
model     = mh.fit(df)
df_hashed = model.transform(df)

In [None]:
# 9) We approx-join on distance = 1 − similarity
sim_thresh     = 0.7
dist_threshold = 1.0 - sim_thresh

raw_pairs = (
    model
      .approxSimilarityJoin(df_hashed, df_hashed, dist_threshold, distCol="distance")
      .filter(col("datasetA.doc_id") < col("datasetB.doc_id"))
)

In [None]:
# We compute Jaccard = 1 − distance, filter by threshold
results = (
    raw_pairs
      .select(
         col("datasetA.doc_id").alias("docA"),
         col("datasetB.doc_id").alias("docB"),
         (1 - col("distance")).alias("jaccard"),
          col("datasetA.text").alias("Review content")
      )
      .filter(col("jaccard") >= sim_thresh)
)

In [None]:
# Time for results

# We write everything to csv (we split the dataset on 10 files in case of massive output)
results .repartition(10).write.mode("overwrite") .option("header", True).csv("similar_reviews_full")



results.show(20, truncate=False)   # show 20 sample rows


+-----+-----+------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# We end the session
spark.stop()