<a href="https://colab.research.google.com/github/maedehrabiee/Algorithms-for-Massive-Data-DSE-/blob/main/big_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Cell 1 — Minimal installs for Colab
!pip install -q pyspark findspark kaggle


In [None]:
# Cell 2 — Kaggle auth (robust; works even if Colab renames file to "kaggle (3).json")
from google.colab import files
import os

# upload once per runtime and pick the actual saved name
uploaded = files.upload()                  # select your kaggle.json
kaggle_json_name = next(iter(uploaded))    # actual name saved by Colab

# write credentials to ~/.kaggle/kaggle.json
kaggle_dir = os.path.expanduser("~/.kaggle")
os.makedirs(kaggle_dir, exist_ok=True)
target_path = os.path.join(kaggle_dir, "kaggle.json")

with open(target_path, "wb") as f:
    f.write(uploaded[kaggle_json_name])

# required permissions
os.chmod(target_path, 0o600)

print("Kaggle credentials set at:", target_path)


Saving kaggle.json to kaggle (2).json
Kaggle credentials set at: /root/.kaggle/kaggle.json


In [None]:
# Cell 3 — Download dataset from Kaggle and unzip
DATASET_ID = "mohamedbakhet/amazon-books-reviews"
!kaggle datasets download -d {DATASET_ID} -p /content -q
!unzip -o /content/amazon-books-reviews.zip -d /content/data > /dev/null

CSV_PATH = "/content/data/Books_rating.csv"  # adjust if the filename differs
print("CSV path:", CSV_PATH)


Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0
CSV path: /content/data/Books_rating.csv


In [None]:
# Cell 4 — Start Spark (Colab-friendly)
import findspark; findspark.init()
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("AmazonReviewSimilarity-Final")
         .getOrCreate())
print("Spark version:", spark.version)


Spark version: 3.5.1


In [None]:
# Cell 5 — Robust CSV load (multiline-safe) + quick peek
df_raw = (spark.read
          .option("header", True)
          .option("multiLine", True)   # reviews can span multiple lines
          .option("escape", "\"")      # handle embedded quotes properly
          .csv(CSV_PATH))

print("Loaded columns:", df_raw.columns)
print("Approx rows:", df_raw.count())
df_raw.show(3, truncate=False)


Loaded columns: ['Id', 'Title', 'Price', 'User_id', 'profileName', 'review/helpfulness', 'review/score', 'review/time', 'review/summary', 'review/text']
Approx rows: 3000000
+----------+------------------------------+-----+--------------+---------------------+------------------+------------+-----------+-----------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Cell 6 — Column auto-detect + rename to a clean schema
from pyspark.sql.functions import col, length

cols = set(df_raw.columns)
def pick(cands):
    for c in cands:
        if c in cols: return c
    return None

text_col  = pick(["reviewText","review_text","review/text","text","review","review_body"])
id_col    = pick(["review_id","Id","id","review/id"])
title_col = pick(["title","review/summary","summary","review_summary"])
asin_col  = pick(["asin","product_id","item_id"])

if text_col is None:
    raise ValueError("Text column not found. Please check your CSV headers.")

keep_cols = [text_col] + [c for c in [id_col, title_col, asin_col] if c]
df = df_raw.select(*keep_cols)

# Standardize names
if text_col  != "review_text": df = df.withColumnRenamed(text_col,  "review_text")
if id_col    and id_col   != "review_id": df = df.withColumnRenamed(id_col,    "review_id")
if title_col and title_col!= "title":     df = df.withColumnRenamed(title_col, "title")
if asin_col  and asin_col != "asin":      df = df.withColumnRenamed(asin_col,  "asin")

# Drop only empty/null texts (do NOT drop duplicates here)
df = df.filter((col("review_text").isNotNull()) & (length(col("review_text")) > 0))

print("Rows after basic filtering:", df.count())
df.show(5, truncate=False)


Rows after basic filtering: 2999992
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Cell 7 — Dev/Final switch (default: sample for Colab Free)
# DEV_MODE=True => run on a small sample (fast). DEV_MODE=False => full data (NOT recommended on Colab Free).
DEV_MODE = True
SAMPLE_FRACTION_DEV = 0.01   # 1% ~ ~30k rows out of 3M (good for iteration)
RANDOM_SEED = 42

if DEV_MODE:
    df = df.sample(False, SAMPLE_FRACTION_DEV, seed=RANDOM_SEED)
    print(f"[DEV] Sampling enabled ({SAMPLE_FRACTION_DEV*100:.1f}%). Proceeding without expensive counts.")
else:
    print("[FINAL] Sampling disabled. WARNING: Full dataset is heavy for Colab Free.")


[DEV] Sampling enabled (1.0%). Proceeding without expensive counts.


In [None]:
# Cell 7b — Lightweight performance knobs for a single Colab VM
# Fewer shuffle partitions => cheaper shuffles. Keep it modest.
spark.conf.set("spark.sql.shuffle.partitions", 64)
print("Shuffle partitions:", spark.conf.get("spark.sql.shuffle.partitions"))


Shuffle partitions: 64


In [None]:
# Cell 8 — Idempotent preprocessing: lowercase + Unicode tokenization + stopwords
from pyspark.sql.functions import lower, col
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.sql import functions as F

# Drop previous outputs if re-running
for c in ["text_lc", "tokens_raw", "tokens"]:
    if c in df.columns:
        df = df.drop(c)

# 1) lowercase
df = df.withColumn("text_lc", lower(col("review_text")))

# 2) Unicode-aware tokenization (split on non-letters/digits)
regex_tok = RegexTokenizer(
    inputCol="text_lc",
    outputCol="tokens_raw",
    pattern="[^\\p{L}\\p{N}]+",
    minTokenLength=1
)
df = regex_tok.transform(df)

# 3) remove English stopwords
df = StopWordsRemover(inputCol="tokens_raw", outputCol="tokens", caseSensitive=False).transform(df)

# sanity peek (cheap)
df.select(F.size("tokens").alias("n_tokens"),
          F.slice("tokens", 1, 10).alias("tokens_preview")).show(5, truncate=False)


+--------+----------------------------------------------------------------------------------------------+
|n_tokens|tokens_preview                                                                                |
+--------+----------------------------------------------------------------------------------------------+
|112     |[lebron, james, basketball, hero, big, story, got, nba, lebron, poor]                         |
|52      |[book, classic, pertains, building, establishing, relationship, works, partner, written, much]|
|15      |[plot, characters, incredible, everyone, likes, supernatural, read, book, night, world]       |
|67      |[although, title, soul, man, socialism, theoretical, basis, talks, collective, anarchy]       |
|35      |[book, best, funny, first, read, felt, like, going, completely, laugh]                        |
+--------+----------------------------------------------------------------------------------------------+
only showing top 5 rows



In [None]:
# Cell 9 — Cap token length (ON by default for Colab Free)
# This keeps memory/time under control without hurting quality much.
from pyspark.sql.functions import slice

MAX_TOKENS = 160        # 160 ~ around p90 on your data; use 200 for a bit more recall
if isinstance(MAX_TOKENS, int) and MAX_TOKENS > 0:
    df = df.withColumn("tokens", slice(col("tokens"), 1, MAX_TOKENS))


In [None]:
# Cell 10 — Drop empty rows and cache (fast materialization)
from pyspark.sql.functions import size
from pyspark import StorageLevel

df = df.filter(size(col("tokens")) > 0).persist(StorageLevel.MEMORY_AND_DISK)

# Materialize cheaply (avoid full df.count on millions of rows)
_ = df.limit(1).count()
print("Filtered & cached (cheap materialization).")


Filtered & cached (cheap materialization).


In [None]:
# Cell 11 — N-grams + SET + stable row_id (light defaults)
from pyspark.ml.feature import NGram
from pyspark.sql.functions import concat, array_distinct, monotonically_increasing_id

USE_BIGRAMS = False     # False by default to be lighter on Colab Free; set True if you need more recall
MAX_TERMS   = 320       # ~2 * MAX_TOKENS (cap the set size). Set None to disable.

# Clean old outputs if re-running
for c in ["bigrams", "all_terms", "row_id"]:
    if c in df.columns:
        df = df.drop(c)

# Build all_terms as a SET (unigrams [+ bigrams if enabled])
if USE_BIGRAMS:
    df = NGram(n=2, inputCol="tokens", outputCol="bigrams").transform(df)
    df = df.withColumn("all_terms", array_distinct(concat(col("tokens"), col("bigrams")))).drop("bigrams")
else:
    df = df.withColumn("all_terms", array_distinct(col("tokens")))

# Optional cap on set size
if isinstance(MAX_TERMS, int) and MAX_TERMS > 0:
    from pyspark.sql.functions import slice
    df = df.withColumn("all_terms", slice(col("all_terms"), 1, MAX_TERMS))

# safety + stable id
from pyspark.sql.functions import size
df = df.filter(size(col("all_terms")) > 0)
df = df.withColumn("row_id", monotonically_increasing_id())

print("all_terms ready (light settings applied).")


all_terms ready (light settings applied).


In [None]:
# Cell 12 — No-op placeholder (kept for structure)
print("Cell 12: noop.")


Cell 12: noop.


In [None]:
# Cell 13 — Parameters tuned for Colab Free
DEV_FLAG = DEV_MODE  # reuse in output path naming
SIM_THRESHOLD   = 0.75     # slightly higher threshold reduces pair volume (cheaper)
NUM_HASH_TABLES = 6        # fewer hash tables -> faster/cheaper than 8 (still decent recall)
NUM_FEATURES    = 262_144  # hashing space (keep as-is)
BLOCK_BY_ASIN   = True     # if 'asin' column exists, keep only pairs within the same product (drastically smaller)

BASE_OUT   = "/content/similar_reviews_out"
OUTPUT_DIR = BASE_OUT + ("_DEV" if DEV_FLAG else "_FULL")

print({"SIM_THRESHOLD": SIM_THRESHOLD,
       "NUM_HASH_TABLES": NUM_HASH_TABLES,
       "NUM_FEATURES": NUM_FEATURES,
       "BLOCK_BY_ASIN": BLOCK_BY_ASIN,
       "OUTPUT_DIR": OUTPUT_DIR})


{'SIM_THRESHOLD': 0.75, 'NUM_HASH_TABLES': 6, 'NUM_FEATURES': 262144, 'BLOCK_BY_ASIN': True, 'OUTPUT_DIR': '/content/similar_reviews_out_DEV'}


In [None]:
# Cell 14 — Binary features via HashingTF (set logic for Jaccard)
from pyspark.ml.feature import HashingTF

cols_to_keep = ["row_id", "review_text", "all_terms"]
if "review_id" in df.columns: cols_to_keep.insert(1, "review_id")
if "asin" in df.columns:      cols_to_keep.append("asin")  # keep asin for optional blocking

htf = HashingTF(inputCol="all_terms", outputCol="features",
                numFeatures=NUM_FEATURES, binary=True)

dfv = htf.transform(df.select(*cols_to_keep))
print("Vectorization done (binary features).")


Vectorization done (binary features).


In [None]:
# Cell 15 — Fit MinHashLSH (Jaccard LSH)
from pyspark.ml.feature import MinHashLSH
mh_model = MinHashLSH(inputCol="features", outputCol="hashes",
                      numHashTables=NUM_HASH_TABLES).fit(dfv)
print("LSH model ready.")


LSH model ready.
