# Similar Book Reviews using PySpark in Google Colab

This notebook implements a system to find pairs of similar book reviews using PySpark. It processes review texts, calculates Jaccard similarity, and identifies pairs above a certain similarity threshold.

**Steps:**
1. Install Spark and `findspark`.
2. Initialize SparkSession.
3. Define text preprocessing functions and UDFs.
4. Load the dataset (e.g., `Books_rating.csv`). **You will need to upload this file or make it accessible from Google Drive.**
5. Preprocess and tokenize review texts.
6. Generate candidate pairs of reviews.
7. Calculate Jaccard similarity for candidate pairs.
8. Filter and display highly similar pairs.

## 1. Install Spark and findspark

In [None]:
!pip install -q findspark
!pip install -q pyspark

## 2. Initialize Spark Environment

In [None]:
import findspark
findspark.init() # Important: run this before importing pyspark

from pyspark.sql import SparkSession
import re
from pyspark.sql.functions import col, lower, regexp_replace, split, udf
from pyspark.sql.types import ArrayType, StringType, SetType
from itertools import combinations

## 3. Define Text Preprocessing Functions and Jaccard Similarity

In [None]:
STOP_WORDS = set([
    "a", "an", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", 
    "into", "is", "it", "no", "not", "of", "on", "or", "such", "that", "the", 
    "their", "then", "there", "these", "they", "this", "to", "was", "will", "with",
    "i", "me", "my", "myself", "we", "our", "ours", "ourselves", "you", "your", 
    "yours", "yourself", "yourselves", "he", "him", "his", "himself", "she", 
    "her", "hers", "herself", "it", "its", "itself", "they", "them", "their", 
    "theirs", "themselves", "what", "which", "who", "whom", "this", "that", 
    "these", "those", "am", "is", "are", "was", "were", "be", "been", "being", 
    "have", "has", "had", "having", "do", "does", "did", "doing", "a", "an", 
    "the", "and", "but", "if", "or", "because", "as", "until", "while", "of", 
    "at", "by", "for", "with", "about", "against", "between", "into", "through", 
    "during", "before", "after", "above", "below", "to", "from", "up", "down", 
    "in", "out", "on", "off", "over", "under", "again", "further", "then", "once", 
    "here", "there", "when", "where", "why", "how", "all", "any", "both", "each", 
    "few", "more", "most", "other", "some", "such", "no", "nor", "not", "only", 
    "own", "same", "so", "than", "too", "very", "s", "t", "can", "will", "just", 
    "don", "should", "now", "d", "ll", "m", "o", "re", "ve", "y", "ain", "aren", 
    "couldn", "didn", "doesn", "hadn", "hasn", "haven", "isn", "ma", "mightn", 
    "mustn", "needn", "shan", "shouldn", "wasn", "weren", "won", "wouldn"
])

def preprocess_text(text):
    if text is None:
        return []
    text = text.lower()
    text = re.sub(r'[^\w\s]', '', text) # Remove punctuation
    text = re.sub(r'\d+', '', text)      # Remove numbers
    words = text.split()
    processed_words = set(word for word in words if word not in STOP_WORDS and len(word) > 1)
    return list(processed_words)

preprocess_text_udf = udf(preprocess_text, ArrayType(StringType()))

def calculate_jaccard_similarity(set1, set2):
    if not isinstance(set1, set):
        set1 = set(set1) # Ensure input is a set
    if not isinstance(set2, set):
        set2 = set(set2) # Ensure input is a set
        
    if not set1 and not set2:
        return 0.0
    if not set1 or not set2:
        return 0.0
        
    intersection_size = len(set1.intersection(set2))
    union_size = len(set1.union(set2))
    if union_size == 0:
        return 0.0 
    return float(intersection_size) / union_size

## 4. Initialize SparkSession and Load Data

**Important:** 
- Update `dataset_path` to the location of your `Books_rating.csv` file in your Colab environment (e.g., `/content/Books_rating.csv` if uploaded directly, or `/content/drive/MyDrive/path/to/Books_rating.csv` if in Google Drive after mounting).
- The dataset is large. For initial runs, using `limit()` is highly recommended.

In [None]:
spark = SparkSession.builder.appName("SimilarBookReviewsColab").master("local[*]").getOrCreate()
sc = spark.sparkContext

# --- Configuration ---
# !!! IMPORTANT: UPDATE THIS PATH !!!
dataset_path = "/content/Books_rating.csv" # Example path for Colab, replace as needed
similarity_threshold = 0.7

print(f"Starting similar book review detection from: {dataset_path}")
print(f"Similarity threshold: {similarity_threshold}")

try:
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(dataset_path)
    # Handle potential issues with column names containing special characters like '/'
    reviews_df = df.select(col("Id").alias("review_id"), col("`review/text`").alias("review_text")).na.drop(subset=["review_text"])
    
    # FOR DEVELOPMENT: Limit data size. Remove or increase for full run.
    reviews_df = reviews_df.limit(1000) 
    print(f"Successfully loaded and selected Id and review/text. Initial count: {reviews_df.count()}")
    reviews_df.show(5, truncate=True)

except Exception as e:
    print(f"Error loading data: {e}")
    print("Please ensure 'Books_rating.csv' is uploaded to Colab and the path is correct.")
    spark.stop()

## 5. Preprocessing & Tokenization

In [None]:
if 'reviews_df' in locals(): # Check if DataFrame exists
    tokenized_reviews_df = reviews_df.withColumn("processed_words", preprocess_text_udf(col("review_text")))
    
    processed_rdd = tokenized_reviews_df.select("review_id", "processed_words").rdd \
        .map(lambda row: (row.review_id, set(row.processed_words))) \
        .filter(lambda x: len(x[1]) > 0) # Filter out reviews that become empty

    processed_rdd.cache()
    print(f"Preprocessing and tokenization complete. RDD count: {processed_rdd.count()}")
    print("Sample of processed RDD:", processed_rdd.take(5))
else:
    print("reviews_df not loaded. Cannot proceed with preprocessing.")

## 6. Generate Candidate Pairs (Inverted Index approach)

In [None]:
if 'processed_rdd' in locals() and processed_rdd.count() > 0:
    # Map 1: (word, review_id)
    word_to_review_id_rdd = processed_rdd.flatMap(lambda x: [(word, x[0]) for word in x[1]])
    # print("Sample of word_to_review_id_rdd:", word_to_review_id_rdd.take(10))

    # Reduce 1 (Group by word): (word, [review_id1, review_id2, ...])
    word_to_review_ids_list_rdd = word_to_review_id_rdd.groupByKey().mapValues(list)
    # print("Sample of word_to_review_ids_list_rdd:", word_to_review_ids_list_rdd.take(5))

    # Map 2 (Generate pairs)
    candidate_pairs_rdd = word_to_review_ids_list_rdd.flatMap(
        lambda x: [tuple(sorted(pair)) for pair in combinations(x[1], 2)]
    ).distinct()
    
    candidate_pairs_rdd.cache()
    print(f"Candidate pair generation complete. Number of candidate pairs: {candidate_pairs_rdd.count()}")
    # print("Sample of candidate_pairs_rdd:", candidate_pairs_rdd.take(10))
else:
    print("processed_rdd not available or empty. Cannot generate candidate pairs.")

## 7. Calculate Jaccard Similarity for Candidate Pairs

In [None]:
if 'candidate_pairs_rdd' in locals() and 'processed_rdd' in locals() and candidate_pairs_rdd.count() > 0:
    review_word_sets_rdd = processed_rdd # (review_id, word_set)

    # Join candidate pairs with word sets
    # pair: (id1, id2)
    # Need to join twice to get word sets for both ids in the pair
    pairs_with_set1 = candidate_pairs_rdd.map(lambda pair: (pair[0], pair[1])) \
        .join(review_word_sets_rdd) \
        .map(lambda x: (x[1][0], (x[0], x[1][1]))) # (id2, (id1, word_set1))
    
    joined_pairs_with_sets = pairs_with_set1.join(review_word_sets_rdd) \
        .map(lambda x: ((x[1][0][0], x[0]), (x[1][0][1], x[1][1]))) # ((id1, id2), (set1, set2))

    # Calculate Jaccard similarity
    # Need to ensure the calculate_jaccard_similarity function is available to executors
    # It's defined globally in a cell above, so it should be fine.
    similarities_rdd = joined_pairs_with_sets.map(
        lambda x: (x[0], calculate_jaccard_similarity(x[1][0], x[1][1]))
    )
    # print("Sample of similarities_rdd:", similarities_rdd.take(10))
else:
    print("Candidate pairs or processed_rdd not available/empty. Cannot calculate similarity.")

## 8. Filter and Output Similar Pairs

In [None]:
if 'similarities_rdd' in locals():
    highly_similar_pairs_rdd = similarities_rdd.filter(lambda x: x[1] >= similarity_threshold)
    
    print(f"Found {highly_similar_pairs_rdd.count()} pairs with similarity >= {similarity_threshold}")

    results = highly_similar_pairs_rdd.collect()
    if results:
        print(f"\n--- Highly Similar Review Pairs (Similarity >= {similarity_threshold}) ---")
        for pair, similarity in results:
            print(f"Review Pair: {pair[0]} - {pair[1]}, Similarity: {similarity:.4f}")
    else:
        print("No pairs found above the similarity threshold.")
else:
    print("similarities_rdd not available. Cannot filter results.")

## 9. Stop Spark Session

In [None]:
if 'spark' in locals():
    spark.stop()
    print("Spark session stopped.")