In [None]:
from google.colab import files
# API and KEY file
files.upload()                                                                  # Upload the API-KEY File

!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# Dataset
!kaggle datasets download -d mohamedbakhet/amazon-books-reviews                 # Downloading the dataset (zip file)
!unzip amazon-books-reviews.zip                                                 # Unzipping the dataset

########################################################################

! pip install sparknlp

import sparknlp
spark = sparknlp.start()

#########################################################################

# ----------------------------
# Spark & DataFrame Operations
# ----------------------------
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, rand, size, length, expr, lit, split, array, array_except, struct,
    collect_list, regexp_replace, lower, monotonically_increasing_id,
    sha1, concat_ws, xxhash64, udf
)
from pyspark.sql.types import ArrayType, StringType

# --------------------
# Spark MLlib Features
# --------------------
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, MinHashLSH

# ----------------
# NLP - NLTK Setup
# ----------------
import nltk
from nltk.stem import WordNetLemmatizer  # for lemmatization
from nltk.corpus import stopwords
import re                                # regex

# NLTK corpora
nltk.download('wordnet')
nltk.download('omw-1.4')
nltk.download('stopwords')

# --------
# Logging
# --------
import logging

from time import time

#########################################################################

In [None]:
# Configuration Parameters

# these are basic default parameters of the main function, however we ran the function for different sample fractions
# and passed the values to function arguments
# directly when calling the functions.

USE_SAMPLE_DATA = True                          # Toggle to use full dataset
SAMPLE_FRACTION = 0.01                          # fraction of dataset to use for analysis if working on a sample
REVIEW_LENGTH = 50                              # Review length used to filter very short reviews
TOKEN_SIZE = 5                                  # Dropping very short lemmatized review (taken to be 5)
FEATURES = 4096                                 # Features in HashingTF
HASHTABLES = 5                                  # HASHTABLES used for LSH
SIM_THRESHOLD = 0.6                             # Similarity threshold for JACCARD Distance
path = "Books_rating.csv"

import logging
import sys

# Reset logging handlers (important in notebooks or repeated runs)
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

logging.basicConfig(
    level=logging.INFO,
    format='%(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)



##### Functions

In [None]:
# data Loading using Function
def load_sample_preprocess_data(path, USE_SAMPLE_DATA, SAMPLE_FRACTION, seed=42):
    # Loading the dataset
    df = spark.read.csv(path, header=True)                              # path is path of the file

    cleaned_df = df.filter(col("review/text").isNotNull())              # Removing the Nulls from review/text
    #cleaned_df.persist()                                                # Caching the cleaned_df

    # print(f"Columns in the dataset are: {cleaned_df.columns}")          # Column Names in the dataset

    if USE_SAMPLE_DATA:
        sampled_df = cleaned_df.sample(withReplacement=False, fraction=SAMPLE_FRACTION, seed=seed)          # Sampling Dataset as per given fraction of sample
        sampled_df.persist()                                                  # caching the sampled_df
    else:
        sampled_df = cleaned_df                                                                  # if no sampling then use full dataset

    #print(f"Number of sampled dataset rows are : {sampled_df.count()}")                         # Count of sampled dataset without nulls

    #cleaned_df.unpersist()                                                # UnCaching the cleaned_df (as it is no longer needed)

    # preprocessing to create a review_id by comnbining the three columns and hashing ti to reduce memory space

    sampled_df = sampled_df.withColumn(
        "review_id",
        xxhash64(sha1(concat_ws("_", col("Id"), col("User_id"), col("review/time"))))                      # concatenating with "_" and then hashing (64 bits)
    )

    return sampled_df

In [None]:
# data sampling
def df_selection(sampled_df, REVIEW_LENGTH):

    selected_reviews = sampled_df.select(                              # Selecting only book_id and review/text from the sampled dataset
    col("Id").alias("book_id"),
    col("review/text").alias("review_text"),
    col("review_id")
    )

    # Filtering the reviews with length > 50 characters                             # Rejecting very short reviews from the analysis
    filtered_reviews = selected_reviews.filter(
        length(col("review_text")) > REVIEW_LENGTH                                  # set initially and passed in function
    )

    #filtered_reviews = filtered_reviews.dropDuplicates(["review_text"])             # Dropping any duplicates from the dataset (if any in just review_text)
    filtered_reviews = filtered_reviews.dropDuplicates(["review_id"])               # Dropping any duplicates from the dataset (if any in review_id (rare but still))

    # filtered_reviews.persist()                                                      # Caching in memory the required dataset

    # Show the results
    # logging.info(f"Total Sampled Book Reviews for Similarity Search : {filtered_reviews.count()}")             # Overview (Count) of the filtered set

    return filtered_reviews

In [None]:
# function for text processing
def text_processing(filtered_reviews, stopwords, TOKEN_SIZE):
  stopword_array = array(*[lit(w) for w in stopwords])  # Convert list to Spark array

  # clean and lemmatize

  lemmatized_reviews_clean = ( filtered_reviews
                              .withColumn("cleaned_text", regexp_replace(col("review_text"), r'[^a-zA-Z\s]', ' '))         # Remove non-alphabetic chars
                              .withColumn("tokens", split(lower(col("cleaned_text")), " "))                                # Convert text to lowercase and split it into tokens
                              .withColumn("tokens", expr("FILTER(tokens, x -> LENGTH(x) > 2 AND x != '')"))                # Filter out tokens that are too short
                              .withColumn("tokens", lemmatize_udf(col("tokens")))                                          # Apply lemmatization
                              .withColumn("tokens", array_except(col("tokens"), stopword_array))                           # Remove stopwords
                              .withColumn("token_size", size(col("tokens")))                                               # Calculate the size of the token array
                              .filter((col("token_size") >= TOKEN_SIZE) & (col("token_size") <= 2000))                     # token count range
                              .persist()
                              )
  return lemmatized_reviews_clean

In [None]:
# create hash functions

import random

def create_hash_functions(num_hashes, max_hash=2**32 - 1):
    """Create a list of hash functions in the form: h(x) = (a*x + b) % p"""
    p = 4294967311  # A large prime > 2^32
    hash_funcs = []
    random.seed(42)

    for _ in range(num_hashes):
        a = random.randint(1, p - 1)
        b = random.randint(0, p - 1)
        hash_funcs.append(lambda x, a=a, b=b: ((a * x + b) % p) % max_hash)
    
    return hash_funcs

hash_funcs = create_hash_functions(num_hashes=100)
type(hash_funcs)

# Min Hashing

def minhash_signature(tokens, hash_funcs):
    """Generate MinHash signature for a list of tokens."""
    signature = []

    for func in hash_funcs:
        min_hash = float('inf')
        for token in tokens:
            h = func(hash(token))  # Use built-in hash for token-to-int
            if h < min_hash:
                min_hash = h
        signature.append(min_hash)
    
    return signature

# Integrating with pyspark
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, LongType
import pandas as pd

@pandas_udf(ArrayType(LongType()))
def compute_minhash_udf(tokens_series: pd.Series) -> pd.Series:
    return tokens_series.apply(lambda tokens: minhash_signature(tokens, hash_funcs))

In [None]:
# hash tokens 
def hash_tokens(lemmatized_reviews_clean, FEATURES):
    # FEATURES now means "number of hash functions" (e.g., 100)
    hashed_reviews = lemmatized_reviews_clean.withColumn(
        "signature", compute_minhash_udf(col("tokens"))
    )
    return hashed_reviews

In [None]:
# LSH
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
import hashlib

class LSH:
    def __init__(self, num_bands):
        self.b = num_bands
        self.buckets = [defaultdict(list) for _ in range(num_bands)]
        self.r = None
        self.id_counter = 0

    def _hash_band(self, band):
        return hashlib.md5(str(band).encode()).hexdigest()

    def add_signature(self, signature):
        if self.r is None:
            assert len(signature) % self.b == 0, "Signature length must divide evenly into bands"
            self.r = len(signature) // self.b

        bands = [signature[i*self.r:(i+1)*self.r] for i in range(self.b)]

        with ThreadPoolExecutor() as executor:
            hashes = list(executor.map(self._hash_band, bands))

        for i, h in enumerate(hashes):
            self.buckets[i][h].append(self.id_counter)

        self.id_counter += 1

    def get_candidate_pairs(self):
        pairs = set()
        for bucket in self.buckets:
            for ids in bucket.values():
                if len(ids) > 1:
                    for i in range(len(ids)):
                        for j in range(i + 1, len(ids)):
                            pairs.add(tuple(sorted((ids[i], ids[j]))))
        return pairs