# **Configuration**

In [0]:
# === CONFIG ===
storage_acct = "goodreadsreviews60107070"

spark.conf.set(
    "fs.azure.account.key.goodreadsreviews60107070.dfs.core.windows.net",
    "W68YOwummMkTfxvE8uSyeHwSn2ISU3fxF43SpgTIU/zdUDmwquZ95QpaxDJnze6PRovNww3bWamU+AStmbtZLg=="
)

container   = "lakehouse"
silver_path = f"abfss://{container}@{storage_acct}.dfs.core.windows.net/processed"
gold_path   = f"abfss://{container}@{storage_acct}.dfs.core.windows.net/gold"

# Output locations for splits
features_v2_base = f"{gold_path}/features_v2"
train_out = f"{features_v2_base}/train"
val_out   = f"{features_v2_base}/val"
test_out  = f"{features_v2_base}/test"

# **II. Splitting the dataset**

In [0]:
# === 1) LOAD CLEAN SOURCE ===
# Use features_v1 as the leakage-safe source for downstream featurization
df = (
    spark.read.format("delta")
    .load(f"{gold_path}/features_v1")  # load by path instead of table()
)

# --- Optional: Basic hygiene filters ---
from pyspark.sql import functions as F

df = (
    df.dropDuplicates(["review_id"])
      .filter(F.col("review_text").isNotNull() & (F.length(F.col("review_text")) >= 10))
      .filter(F.col("rating").isNotNull())
)

# Verify schema and sample
df.printSchema()
df.show(5, truncate=False)

In [0]:
# === 2) MAKE REPRODUCIBLE SPLITS (70/15/15) ===
# Note: split BEFORE TF-IDF or encoders to avoid data leakage

from pyspark.sql import functions as F

seed = 67

splits = (
    df.withColumn("_rand", F.rand(seed))
      .withColumn(
          "_split",
          F.when(F.col("_rand") < 0.70, F.lit("train"))
           .when(F.col("_rand") < 0.85, F.lit("val"))
           .otherwise(F.lit("test"))
      )
)

train_df = splits.filter(F.col("_split") == "train").drop("_rand", "_split")
val_df   = splits.filter(F.col("_split") == "val").drop("_rand", "_split")
test_df  = splits.filter(F.col("_split") == "test").drop("_rand", "_split")

# Optional sanity check
print("Train:", train_df.count(), "Val:", val_df.count(), "Test:", test_df.count())

In [0]:
# === 3) WRITE SPLITS TO GOLD/features_v2 ===
# Overwrite to keep paths stable while iterating

out_path = f"{gold_path}/features_v2"

train_out = f"{out_path}/train"
val_out   = f"{out_path}/val"
test_out  = f"{out_path}/test"

(train_df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(train_out))

(val_df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(val_out))

(test_df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(test_out))

In [0]:
# === 4) QUICK VERIFICATION ===
def load_and_count(p):
    df = spark.read.format("delta").load(p)
    return df, df.count()

train_loaded, n_train = load_and_count(train_out)
val_loaded,   n_val   = load_and_count(val_out)
test_loaded,  n_test  = load_and_count(test_out)

print("Split counts →",
      "train:", n_train,
      "val:",   n_val,
      "test:",  n_test,
      "total:", n_train + n_val + n_test)

# Peek a few rows to ensure schema/fields look right
train_loaded.show(5, truncate=False)
val_loaded.show(5, truncate=False)
test_loaded.show(5, truncate=False)

In [0]:
# === 5) SAVE SPLIT MANIFEST WITH COUNTS + PERCENTAGES ===
total_records = n_train + n_val + n_test

manifest_data = [
    ("train", n_train, round((n_train / total_records) * 100, 2)),
    ("val",   n_val,   round((n_val / total_records) * 100, 2)),
    ("test",  n_test,  round((n_test / total_records) * 100, 2))
]

manifest = spark.createDataFrame(
    manifest_data,
    ["split", "count", "percentage"]
)

(manifest.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .save(f"{features_v2_base}/_manifest_counts"))

manifest.show(truncate=False)

# **III. Text Feature Extraction in Databricks**

In [0]:
# Databricks notebook source
# =========================================================
# GOODREADS TEXT FEATURE EXTRACTION
# =========================================================
# Purpose:
#   Load train split from feature_v2 (Gold layer)
#   Work with review_text column for NLP feature engineering
# =========================================================

# === 1. CONFIGURE STORAGE ACCESS ===
storage_acct = "goodreadsreviews60107070"

spark.conf.set(
    f"fs.azure.account.key.{storage_acct}.dfs.core.windows.net",
    "W68YOwummMkTfxvE8uSyeHwSn2ISU3fxF43SpgTIU/zdUDmwquZ95QpaxDJnze6PRovNww3bWamU+AStmbtZLg=="
)

container = "lakehouse"
gold_path = f"abfss://{container}@{storage_acct}.dfs.core.windows.net/gold"
train_path = f"{gold_path}/features_v2/train"

In [0]:
# === 2. LOAD DATASET (feature_v2/train) ===
train_df = spark.read.format("delta").load(train_path)

print("Total records:", train_df.count())
train_df.printSchema()

Total records: 10480029
root
 |-- book_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- n_votes: integer (nullable = true)
 |-- date_added: string (nullable = true)
 |-- date_added_parsed: timestamp (nullable = true)
 |-- date_added_iso: date (nullable = true)
 |-- review_length: integer (nullable = true)
 |-- word_count: integer (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- avg_rating_per_book: double (nullable = true)
 |-- n_reviews_per_book: long (nullable = true)



## **III.3. Perform text cleaning and normalization**

In [0]:
# === 3. TEXT CLEANING & NORMALIZATION (review_text) ===
# If not already available on the cluster, install once:
%pip install emoji==2.14.0

import re
from pyspark.sql.functions import col, length, lower, regexp_replace, trim
from pyspark.sql import functions as F

# --- 3.1 Define regex patterns (Java/PCRE compatible for Spark) ---
URL_PATTERN   = r'(https?://\S+|www\.\S+)'
NUM_PATTERN   = r'\d+'
# Remove punctuation EXCEPT < and > so placeholders like <URL> survive
PUNCT_EXCEPT_PLACEHOLDERS = r'[\\p{Punct}&&[^<>]]'

# --- 3.2 Emoji replacement via Python UDF (uses emoji lib) ---
# Databricks note: ensure `emoji` package is installed on the cluster
try:
    import emoji
except Exception as e:
    raise RuntimeError("Install the `emoji` package on the cluster: %pip install emoji==2.14.0") from e

def replace_emojis_to_placeholder(text: str) -> str:
    if text is None:
        return None
    # replace each emoji grapheme with <EMOJI>
    # emoji.replace_emoji handles combined emojis and skin tones correctly
    return emoji.replace_emoji(text, replace='<EMOJI>')

replace_emojis_udf = F.udf(replace_emojis_to_placeholder, returnType=F.StringType())

# --- 3.3 Apply cleaning pipeline in order ---
# Order matters: placeholders first, then punctuation/spacing, then trim + filter
cleaned_df = (
    train_df
      .withColumn("raw_text", col("review_text"))
      # lowercase
      .withColumn("clean_text", lower(col("review_text")))
      # URLs -> <URL>
      .withColumn("clean_text", regexp_replace(col("clean_text"), URL_PATTERN, " <URL> "))
      # numbers -> <NUM>
      .withColumn("clean_text", regexp_replace(col("clean_text"), NUM_PATTERN, " <NUM> "))
      # emojis -> <EMOJI> (UDF)
      .withColumn("clean_text", replace_emojis_udf(col("clean_text")))
      # remove punctuation except <> to keep placeholders
      .withColumn("clean_text", regexp_replace(col("clean_text"), PUNCT_EXCEPT_PLACEHOLDERS, " "))
      # collapse multiple spaces
      .withColumn("clean_text", regexp_replace(col("clean_text"), r"\s+", " "))
      # trim
      .withColumn("clean_text", trim(col("clean_text")))
      # filter out empty or very short reviews (<10 chars)
      .filter(length(col("clean_text")) >= 10)
)

# --- 3.4 Quick sanity checks ---
print("After cleaning:", cleaned_df.count())
display(
    cleaned_df.select("review_id", "raw_text", "clean_text").limit(10)
)

In [0]:
# === 3.5 SAVE CLEANED TEXT (ALL COLUMNS) ===
cleaned_out_path = f"{gold_path}/features_v2/text_cleaned"

(
    cleaned_df
    .write
    .mode("overwrite")
    .format("delta")
    .save(cleaned_out_path)
)

print(f"Full cleaned dataset saved to: {cleaned_out_path}")

In [0]:
train_raw = spark.read.format("delta").load(f"{gold_path}/features_v2/train")
train_count = train_raw.select("review_id").distinct().count()
print("Raw train count:", train_count)


Raw train count: 10480029


## **III.4.Extract text-based features**

### **III.4.a. Basic Text Features**

In [0]:
# === III 4a. BASIC TEXT FEATURES ===
from pyspark.sql import functions as F

text_basic_df = (
    cleaned_df
    .withColumn("review_length_words", F.size(F.split(F.col("clean_text"), r"\s+")))
    .withColumn("review_length_chars", F.length(F.col("clean_text")))
    .filter(F.col("review_length_words") > 0)
)

# Quick sample
display(
    text_basic_df.select("review_id","clean_text","review_length_words","review_length_chars").limit(10)
)

# Summary (use percentile_approx for median)
summary_df = (
    text_basic_df.agg(
        F.count("*").alias("n_rows"),
        F.avg("review_length_words").alias("avg_words"),
        F.percentile_approx("review_length_words", 0.5).alias("p50_words"),
        F.max("review_length_words").alias("max_words"),
        F.avg("review_length_chars").alias("avg_chars"),
        F.max("review_length_chars").alias("max_chars"),
    )
)
display(summary_df)

# Save (ALL columns retained + new features); path aligned with earlier convention
basic_out = f"{gold_path}/features_v2/text_basic"
(text_basic_df
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(basic_out))

print("Basic text features saved to:", basic_out)

# Optional reload
reloaded = spark.read.format("delta").load(basic_out)
reloaded.printSchema()
print("Count:", reloaded.count())


### **III.4.b. Sentiment Features**

In [0]:
# === 4b. SENTIMENT FEATURES (VADER) ===
# If not installed on cluster:
%pip install nltk==3.9.1

import nltk
from nltk.sentiment import SentimentIntensityAnalyzer
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, DoubleType

# Download VADER lexicon if not already available
nltk.download("vader_lexicon")

# Initialize analyzer once (broadcast to executors)
sia = SentimentIntensityAnalyzer()

# Define function to compute sentiment scores
def vader_scores(text):
    if text is None:
        return (0.0, 0.0, 0.0, 0.0)
    s = sia.polarity_scores(text)
    return (float(s["pos"]), float(s["neu"]), float(s["neg"]), float(s["compound"]))

schema = StructType([
    StructField("sentiment_pos", DoubleType(), True),
    StructField("sentiment_neu", DoubleType(), True),
    StructField("sentiment_neg", DoubleType(), True),
    StructField("sentiment_compound", DoubleType(), True),
])

vader_udf = F.udf(vader_scores, schema)

# Apply to dataset
sentiment_df = (
    text_basic_df
    .withColumn("sentiment", vader_udf(F.col("clean_text")))
    .withColumn("sentiment_pos", F.col("sentiment.sentiment_pos"))
    .withColumn("sentiment_neu", F.col("sentiment.sentiment_neu"))
    .withColumn("sentiment_neg", F.col("sentiment.sentiment_neg"))
    .withColumn("sentiment_compound", F.col("sentiment.sentiment_compound"))
    .drop("sentiment")
)

# Quick inspection
display(
    sentiment_df.select(
        "review_id", "clean_text",
        "sentiment_pos", "sentiment_neu", "sentiment_neg", "sentiment_compound"
    ).limit(10)
)

# Save sentiment-enriched data
sentiment_out = f"{gold_path}/features_v2/text_sentiment"
(sentiment_df
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(sentiment_out))

print("Sentiment features saved to:", sentiment_out)


In [0]:
sentiment_df = spark.read.format("delta").load(f"{gold_path}/features_v2/text_sentiment")
sentiment_df.printSchema()
print("Total records:", sentiment_df.count())
display(sentiment_df.limit(5))

root
 |-- book_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- n_votes: integer (nullable = true)
 |-- date_added: string (nullable = true)
 |-- date_added_parsed: timestamp (nullable = true)
 |-- date_added_iso: date (nullable = true)
 |-- review_length: integer (nullable = true)
 |-- word_count: integer (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- avg_rating_per_book: double (nullable = true)
 |-- n_reviews_per_book: long (nullable = true)
 |-- raw_text: string (nullable = true)
 |-- clean_text: string (nullable = true)
 |-- review_length_chars: integer (nullable = true)
 |-- sentiment_pos: double (nullable = true)
 |-- sentiment_neu: double (nullable =

book_id,review_id,title,author_id,name,user_id,rating,review_text,language_code,n_votes,date_added,date_added_parsed,date_added_iso,review_length,word_count,review_length_words,avg_rating_per_book,n_reviews_per_book,raw_text,clean_text,review_length_chars,sentiment_pos,sentiment_neu,sentiment_neg,sentiment_compound
18375252,bec7e62ae812fada353b8456d2371a87,Au Revoir Là-haut,822613,Pierre Lemaitre,c55bde87d5dce88e7b7a4ba5a4d2257d,1,banalities on banalities old tunes nothing new. dumped.,fre,0,Sun Mar 05 01:41:18 -0800 2017,2017-03-05T09:41:18Z,2017-03-05,55,8,16,4.225806451612903,31,banalities on banalities old tunes nothing new. dumped.,ba ali ies o ba ali ies old es o hi g ew. d m ed.,49,0.0,1.0,0.0,0.0
41865,9f2ca2cc167c2f16892858c09fe51b7c,"Twilight (twilight, #1)",941441,Stephenie Meyer,3b92b0352627e473e429e80ff1ef7dd5,2,"this is a book made for teenagers. the whole notion that you can have a relationship based on platonic love is very appealing for teenagers, specially girls, that feel targeted for their looks only. of course let's put in the mix some seriously sexy pretty vampire, that is rich and all powerful and that adores and is crazy about the main girl character, and the cocktail is just perfect. it is not very understandable this all powerful, pretty vampire is so crazy about this boringly plain girl, unless you read the book and realize he kind of didn't have much to choose from.... all said, of course we all have a teenager in our hearts and this is a very easy book to read. unless you get bored by teenager angst.",en-us,0,Tue Sep 10 00:39:23 -0700 2013,2013-09-10T07:39:23Z,2013-09-10,717,131,177,3.4414496264889967,9906,"this is a book made for teenagers. the whole notion that you can have a relationship based on platonic love is very appealing for teenagers, specially girls, that feel targeted for their looks only. of course let's put in the mix some seriously sexy pretty vampire, that is rich and all powerful and that adores and is crazy about the main girl character, and the cocktail is just perfect. it is not very understandable this all powerful, pretty vampire is so crazy about this boringly plain girl, unless you read the book and realize he kind of didn't have much to choose from.... all said, of course we all have a teenager in our hearts and this is a very easy book to read. unless you get bored by teenager angst.","his is a book made for ee agers. he whole o io ha yo a have a rela io shi based o la o i love is very a eali g for ee agers, s e ially girls, ha feel arge ed for heir looks o ly. of o rse le 's i he mix some serio sly sexy re y vam ire, ha is ri h a d all owerf l a d ha adores a d is razy abo he mai girl hara er, a d he o k ail is j s erfe . i is o very ders a dable his all owerf l, re y vam ire is so razy abo his bori gly lai girl, less yo read he book a d realize he ki d of did ' have m h o hoose from.... all said, of o rse we all have a ee ager i o r hear s a d his is a very easy book o read. less yo ge bored by ee ager a gs .",637,0.187,0.8,0.013,0.9743
30109238,804b10dcc6b047605074a5c833373384,"Lake Of Dreams (fortune Bay, Prequel Novella)",15240324,Judith Hudson,600c811b96fed8dd0181b7024aac0524,5,very nice start to a series. love the setting of the book. looking forward to reading more.,eng,0,Thu Sep 08 11:35:19 -0700 2016,2016-09-08T18:35:19Z,2016-09-08,91,17,23,4.4,5,very nice start to a series. love the setting of the book. looking forward to reading more.,very i e s ar o a series. love he se i g of he book. looki g forward o readi g more.,84,0.271,0.729,0.0,0.6666
15803173,281289cad67a54610e731307e42bf728,Golden Boy,4818033,Abigail Tarttelin,17aaae5b58b453a8cdd4bc54c2ff3f0b,5,great book! so much food for thought regarding gender and identity... this book is powerful and touching.,eng,0,Sat Nov 21 23:16:47 -0800 2015,2015-11-22T07:16:47Z,2015-11-22,105,17,28,4.32258064516129,279,great book! so much food for thought regarding gender and identity... this book is powerful and touching.,grea book! so m h food for ho gh regardi g ge der a d ide i y... his book is owerf l a d o hi g.,96,0.0,1.0,0.0,0.0
1158706,e737dd555f23e1eaf45647e0cfaeb297,"Strangers In Death (in Death, #26)",17065,J.d. Robb,fca26c34be8fe623ee340061f1281796,4,"strangers in death (police proc-eve dallas-nyc-2060) - vg robb, j.d. (aka nora roberts) - 26th in series g.p. putnam's sons, 2008, us hardcover - isbn: 9780399154706 first sentence: murder harbored no bigotry, no bias. when a wealthy man is found murdered in his apartment, lt. eve dallas first looks to the wife as a suspect. the wife, however, was out of the country with friends and has an air-tight alibi. dallas has a feeling, however, and a determination to find justice for the victim. i have long admitted to being of fan of this series and this book doesn't change that. the strengths are all there; crisp dialogue with wonderful interjections of humor, wonderful characters and the portrayal of the relationship between them, the fun slightly-futuristic-but-not-unbelievable technology and, yes, some nice scenes between eve and her husband, roarke. the plot didn't have the same emotional charge some have had, but it did have a delightfully twisted villain. a slight weakness was whomever relied on spell-check to catch errors (hear versus here), but that's minor. somewhat more disappointing was that i saw where the plot was going a bit earlier than i'd have liked. however, that didn't prevent my reading the book all in one day and enjoying it.",eng,0,Tue Mar 11 22:21:13 -0700 2008,2008-03-12T05:21:13Z,2008-03-12,1265,208,324,3.953488372093023,129,"strangers in death (police proc-eve dallas-nyc-2060) - vg robb, j.d. (aka nora roberts) - 26th in series g.p. putnam's sons, 2008, us hardcover - isbn: 9780399154706 first sentence: murder harbored no bigotry, no bias. when a wealthy man is found murdered in his apartment, lt. eve dallas first looks to the wife as a suspect. the wife, however, was out of the country with friends and has an air-tight alibi. dallas has a feeling, however, and a determination to find justice for the victim. i have long admitted to being of fan of this series and this book doesn't change that. the strengths are all there; crisp dialogue with wonderful interjections of humor, wonderful characters and the portrayal of the relationship between them, the fun slightly-futuristic-but-not-unbelievable technology and, yes, some nice scenes between eve and her husband, roarke. the plot didn't have the same emotional charge some have had, but it did have a delightfully twisted villain. a slight weakness was whomever relied on spell-check to catch errors (hear versus here), but that's minor. somewhat more disappointing was that i saw where the plot was going a bit earlier than i'd have liked. however, that didn't prevent my reading the book all in one day and enjoying it.","s ra gers i dea h ( oli e ro -eve dallas- y - ) - vg robb, j.d. (aka ora rober s) - h i series g. . am's so s, , s hard over - isb : firs se e e: m rder harbored o bigo ry, o bias. whe a weal hy ma is fo d m rdered i his a ar me , l . eve dallas firs looks o he wife as a s s e . he wife, however, was o of he o ry wi h frie ds a d has a air- igh alibi. dallas has a feeli g, however, a d a de ermi a io o fi d j s i e for he vi im. i have lo g admi ed o bei g of fa of his series a d his book does ' ha ge ha . he s re g hs are all here; ris dialog e wi h wo derf l i erje io s of h mor, wo derf l hara ers a d he or rayal of he rela io shi be wee hem, he f sligh ly-f ris i -b - o - believable e h ology a d, yes, some i e s e es be wee eve a d her h sba d, roarke. he lo did ' have he same emo io al harge some have had, b i did have a deligh f lly wis ed villai . a sligh weak ess was whomever relied o s ell- he k o a h errors (hear vers s here), b ha 's mi or. somewha more disa oi i g was ha i saw where he lo was goi g a bi earlier ha i'd have liked. however, ha did ' reve my readi g he book all i o e day a d e joyi g i .",1155,0.089,0.873,0.037,0.8894


### **III.4.c. TF-IDF Features**

In [0]:
# === TF-IDF on TRAIN (step-by-step; built-in stopwords; bigrams via SQL) ===
from pyspark.sql import functions as F
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.linalg import SparseVector, VectorUDT
from pyspark.sql.types import IntegerType

# 0) Optional: silence MLflow autolog noise in Databricks
try:
    import mlflow
    mlflow.autolog(disable=True)
except Exception:
    pass

# 1) Load cleaned data and restrict to TRAIN
cleaned = (spark.read.format("delta")
           .load(f"{gold_path}/features_v2/text_cleaned")
           .select("review_id","clean_text"))

train_ids = (spark.read.format("delta")
             .load(f"{gold_path}/features_v2/train")
             .select("review_id").distinct())

train_df = train_ids.join(cleaned, "review_id", "inner")
print("Loaded cleaned train reviews:", train_df.count())

# 2) Tokenize (tokens)
tok = RegexTokenizer(inputCol="clean_text", outputCol="tokens", pattern=r"\s+", minTokenLength=1)
df1 = tok.transform(train_df).select("review_id","tokens")

# 3) Stopwords (tokens_sw) — built-in English
sw = StopWordsRemover(
    inputCol="tokens",
    outputCol="tokens_sw",
    stopWords=StopWordsRemover.loadDefaultStopWords("english")
)
df2 = sw.transform(df1).select("review_id","tokens_sw")

# 4) Bigrams (bigrams) via SQL expr
df3 = df2.withColumn(
    "bigrams",
    F.expr("""
      CASE WHEN size(tokens_sw) >= 2
           THEN transform(sequence(0, size(tokens_sw)-2),
                          i -> concat(tokens_sw[i], ' ', tokens_sw[i+1]))
           ELSE array()
      END
    """)
)

# 5) Combine unigrams + bigrams (uni_bi)
feat = (df3
        .withColumn("uni_bi", F.concat(F.col("tokens_sw"), F.col("bigrams")))
        .select("review_id","uni_bi"))

# 6) TF → IDF → TF-IDF (fit on full TRAIN, no sampling)
cv = CountVectorizer(inputCol="uni_bi", outputCol="tf", vocabSize=200_000, minDF=5)
cv_model = cv.fit(feat)
tf = cv_model.transform(feat).select("review_id","tf")

idf = IDF(inputCol="tf", outputCol="tfidf", minDocFreq=5)
idf_model = idf.fit(tf)
tfidf_df = idf_model.transform(tf).select("review_id","tfidf")

# 7) Save TF-IDF and verify row parity
out_train = f"{gold_path}/features_v2/train_text_tfidf"
tfidf_df.write.mode("overwrite").format("delta").save(out_train)

rows = tfidf_df.select("review_id").distinct().count()
expected = train_df.select("review_id").distinct().count()
print({"rows": rows, "expected": expected, "ok": rows == expected})

# 8) NNZ summary WITHOUT Spark SQL method parsing (safe UDF path)
out = spark.read.format("delta").load(out_train)

@F.udf(IntegerType())
def nnz(v):
    if v is None:
        return 0
    if isinstance(v, SparseVector):
        return int(len(v.indices))
    return int(sum(1 for x in v if x != 0.0))  # dense fallback (unlikely)

# type check
assert isinstance(out.schema["tfidf"].dataType, VectorUDT), "tfidf column is not VectorUDT."

(out.withColumn("nnz", nnz(F.col("tfidf")))
    .select("nnz")
    .summary("count","min","max","mean","stddev")
    .show())

# 9) Optional: quick vocab peek
print("vocab_size:", len(cv_model.vocabulary))
spark.createDataFrame([(i, t) for i, t in enumerate(cv_model.vocabulary[:20])],
                      ["term_index","term"]).show(truncate=False)

Loaded cleaned train reviews: 10442623
{'rows': 10442623, 'expected': 10442623, 'ok': True}
+-------+------------------+
|summary|               nnz|
+-------+------------------+
|  count|          10442623|
|    min|                 0|
|    max|              2975|
|   mean| 160.0799581675983|
| stddev|185.84002166493258|
+-------+------------------+

vocab_size: 200000
+----------+----+
|term_index|term|
+----------+----+
|0         |o   |
|1         |e   |
|2         |d   |
|3         |g   |
|4         |h   |
|5         |ha  |
|6         |.   |
|7         |b   |
|8         |io  |
|9         |wi  |
|10        |y   |
|11        |,   |
|12        |k   |
|13        |r   |
|14        |er  |
|15        |hi  |
|16        |ed  |
|17        |yo  |
|18        |wi h|
|19        |'   |
+----------+----+



### **III.4.d. Semantic Embedding Features**

In [0]:
# Safety: moderate Arrow batches
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "512")

In [0]:
from pyspark.sql import functions as F, Window as W

# Use the sentiment-enriched table
source_df = spark.read.format("delta").load(f"{gold_path}/features_v2/text_sentiment") \
                     .select("review_id","clean_text") \
                     .dropna(subset=["clean_text"])

# Stable row index for chunking (order by review_id for determinism)
w = W.orderBy("review_id")
indexed_df = source_df.withColumn("row_idx", F.row_number().over(w) - 1).cache()
total_rows = indexed_df.count()
print("Total rows:", total_rows)

Total rows: 10442623


In [0]:
emb_out = f"{gold_path}/features_v2/text_embeddings_sbert"

# Create empty table if not exists (schema)
empty_df = indexed_df.limit(0).withColumn("bert_embedding", F.array().cast(ArrayType(FloatType())))
(empty_df
    .select("review_id","clean_text","bert_embedding")
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema","true")
    .save(emb_out))

# Track progress in DBFS
progress_path = "dbfs:/tmp/emb_progress.txt"
def write_progress(msg):
    dbutils.fs.put(progress_path, msg, True)
def read_progress():
    try:
        return dbutils.fs.head(progress_path)
    except:
        return ""

In [0]:
# --- Speed knobs ---
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")
spark.conf.set("spark.sql.shuffle.partitions", "200")  # moderate shuffle fanout

# SBERT params (keep model same)
MODEL_NAME   = "sentence-transformers/all-MiniLM-L6-v2"
USE_GPU      = False          # set True if you have GPUs
BATCH_SIZE   = 128            # ↑ batch
REPARTITIONS = 32             # fewer, larger partitions
CHUNK_SIZE   = 300_000        # ↑ chunk size to reduce driver/job overhead

# Ensure base/indexed DF prepared once (outside loop), cached:
# indexed_df: columns [review_id, clean_text, row_idx] with row_idx = row_number()-1, cache() called.

# UDF (ensure global cache works)
_model = None
def _get_model():
    global _model
    if _model is None:
        from sentence_transformers import SentenceTransformer
        device = "cuda" if USE_GPU else "cpu"
        # Optional: cache model to DBFS to avoid repeated downloads across clusters
        _model = SentenceTransformer(MODEL_NAME, device=device)
    return _model

import pandas as pd
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import pandas_udf, col

@pandas_udf(ArrayType(FloatType()))
def sbert_embed(texts: pd.Series) -> pd.Series:
    m = _get_model()
    embs = m.encode(
        texts.fillna("").astype(str).tolist(),
        batch_size=BATCH_SIZE,
        show_progress_bar=False,
        convert_to_numpy=True,
        normalize_embeddings=True,
    )
    return pd.Series([e.astype("float32").tolist() for e in embs])

# Destination (append mode)
emb_out = f"{gold_path}/features_v2/text_embeddings_sbert"

from math import ceil
total_rows = indexed_df.count()
num_chunks = ceil(total_rows / CHUNK_SIZE)
print(f"Planned chunks: {num_chunks} of ~{CHUNK_SIZE} rows")

for k in range(num_chunks):
    start = k * CHUNK_SIZE
    end   = min((k+1) * CHUNK_SIZE, total_rows) - 1

    done = read_progress()
    tag = f"[{start}-{end}]"
    if tag in done:
        print(f"Skip chunk {tag}")
        continue

    print(f"Processing chunk {tag} ...")
    # No extra shuffle columns; single repartition once per chunk
    chunk_df = (
        indexed_df
          .where((F.col("row_idx") >= start) & (F.col("row_idx") <= end))
          .select("review_id","clean_text")
          .repartition(REPARTITIONS)
    )

    emb_df = chunk_df.withColumn("bert_embedding", sbert_embed(col("clean_text")))

    (emb_df
        .select("review_id","clean_text","bert_embedding")
        .write
        .format("delta")
        .mode("append")
        .save(emb_out))

    write_progress(done + f"{tag} ")
    print(f"Done chunk {tag}")


Planned chunks: 35 of ~300000 rows
Processing chunk [0-299999] ...
Wrote 85 bytes.
Done chunk [0-299999]
Processing chunk [300000-599999] ...
Wrote 101 bytes.
Done chunk [300000-599999]
Processing chunk [600000-899999] ...
Wrote 117 bytes.
Done chunk [600000-899999]
Processing chunk [900000-1199999] ...
Wrote 134 bytes.
Done chunk [900000-1199999]
Processing chunk [1200000-1499999] ...
Wrote 152 bytes.
Done chunk [1200000-1499999]
Processing chunk [1500000-1799999] ...
Wrote 170 bytes.
Done chunk [1500000-1799999]
Processing chunk [1800000-2099999] ...
Wrote 188 bytes.
Done chunk [1800000-2099999]
Processing chunk [2100000-2399999] ...
Wrote 206 bytes.
Done chunk [2100000-2399999]
Processing chunk [2400000-2699999] ...
Wrote 224 bytes.
Done chunk [2400000-2699999]
Processing chunk [2700000-2999999] ...
Wrote 242 bytes.
Done chunk [2700000-2999999]
Processing chunk [3000000-3299999] ...
Wrote 260 bytes.
Done chunk [3000000-3299999]
Processing chunk [3300000-3599999] ...
Wrote 278 bytes.

### sanity checks n verifications n corrections

In [0]:
from pyspark.sql import functions as F

# === Load embeddings table ===
emb = spark.read.format("delta").load(f"{gold_path}/features_v2/text_embeddings_sbert")

# === 1) Schema and sample ===
emb.printSchema()
emb.select("review_id", F.size("bert_embedding").alias("dim")).show(5, truncate=False)

# === 2) Dimension consistency (should be 384) ===
(emb.select(F.size("bert_embedding").alias("dim"))
    .groupBy("dim").count()
    .orderBy("dim")
    .show())

# === 3) Null / empty check ===
bad = emb.where(F.col("bert_embedding").isNull() | (F.size("bert_embedding") == 0)).count()
print(f"Null or empty embeddings: {bad}")

# === 4) Count verification (compare to train split) ===
train = spark.read.format("delta").load(f"{gold_path}/features_v2/train")
src_cnt = train.select("review_id").distinct().count()
emb_cnt = emb.select("review_id").distinct().count()
print(f"Train reviews: {src_cnt},  Embeddings: {emb_cnt},  Missing: {src_cnt - emb_cnt}")

# === 5) L2 norm sanity (should be ~1.0) ===
def l2(v): return F.sqrt(F.aggregate(F.transform(v, lambda x: x*x), F.lit(0.0), lambda a,b: a+b))
(emb.select(F.round(l2(F.col("bert_embedding")), 4).alias("l2"))
     .summary("count","min","max","mean","stddev")
     .show())

root
 |-- review_id: string (nullable = true)
 |-- clean_text: string (nullable = true)
 |-- bert_embedding: array (nullable = true)
 |    |-- element: float (containsNull = true)

+--------------------------------+---+
|review_id                       |dim|
+--------------------------------+---+
|04e795313e75aa94de1de3d704b93d68|384|
|04e7ede2bbcc39b621a8740503da1bd7|384|
|04e83a02de5f3c80d85e49d4180514c3|384|
|04e8864ccd13fe8d0923e8a45553f2f5|384|
|04e8cf27986dc0bac71cefc9a0aa42fb|384|
+--------------------------------+---+
only showing top 5 rows
+---+--------+
|dim|   count|
+---+--------+
|384|10942623|
+---+--------+

Null or empty embeddings: 0
Train reviews: 10480029,  Embeddings: 10442623,  Missing: 37406
+-------+--------+
|summary|      l2|
+-------+--------+
|  count|10942623|
|    min|     1.0|
|    max|     1.0|
|   mean|     1.0|
| stddev|     0.0|
+-------+--------+



In [0]:
base = f"{gold_path}/features_v2"
train_ids = spark.read.format("delta").load(f"{base}/train").select("review_id").distinct()
print("train_ids:", train_ids.count())

train_ids: 10480029


In [0]:
# Source SBERT (already computed)
emb_all = spark.read.format("delta").load(f"{base}/text_embeddings_sbert").select("review_id","bert_embedding")

# Keep only train rows and persist
train_emb = train_ids.join(emb_all, "review_id", "inner")
train_emb.write.mode("overwrite").format("delta").save(f"{base}/train_text_embeddings_sbert")

# Verify
x = spark.read.format("delta").load(f"{base}/train_text_embeddings_sbert")
print({"train_ids": train_ids.count(), "train_emb": x.select("review_id").distinct().count()})

{'train_ids': 10480029, 'train_emb': 10480029}


In [0]:
tfidf_train = spark.read.format("delta").load(f"{base}/train_text_tfidf").select("review_id","tfidf")
print("tfidf_train:", tfidf_train.count())


tfidf_train: 10442623


In [0]:
basic_all = spark.read.format("delta").load(f"{base}/text_basic") \
                     .select("review_id","review_length_words","review_length_chars")
sent_all  = spark.read.format("delta").load(f"{base}/text_sentiment") \
                     .select("review_id","sentiment_pos","sentiment_neu","sentiment_neg","sentiment_compound")

basic_train = train_ids.join(basic_all, "review_id", "inner")
sent_train  = train_ids.join(sent_all,  "review_id", "inner")

basic_train.write.mode("overwrite").format("delta").save(f"{base}/train_text_basic")
sent_train.write.mode("overwrite").format("delta").save(f"{base}/train_text_sentiment")

print({
  "basic_train": spark.read.format("delta").load(f"{base}/train_text_basic").count(),
  "sent_train":  spark.read.format("delta").load(f"{base}/train_text_sentiment").count()
})


{'basic_train': 10442623, 'sent_train': 10442623}


In [0]:
keys = spark.read.format("delta").load(f"{base}/train").select("review_id","book_id","rating")

train_emb   = spark.read.format("delta").load(f"{base}/train_text_embeddings_sbert")
tfidf_train = spark.read.format("delta").load(f"{base}/train_text_tfidf")
basic_train = spark.read.format("delta").load(f"{base}/train_text_basic")
sent_train  = spark.read.format("delta").load(f"{base}/train_text_sentiment")

train_features = (keys
    .join(tfidf_train, "review_id", "left")
    .join(train_emb,   "review_id", "left")
    .join(basic_train, "review_id", "left")
    .join(sent_train,  "review_id", "left"))

out = f"{base}/train_features_v2_all"
train_features.write.mode("overwrite").format("delta").save(out)

# Verifications
df = spark.read.format("delta").load(out)
n_keys = keys.select("review_id").distinct().count()
n_rows = df.select("review_id").distinct().count()
null_sbert = df.where(df.bert_embedding.isNull()).count()
null_tfidf = df.where(df.tfidf.isNull()).count()
print({"keys": n_keys, "final_rows": n_rows, "null_sbert": null_sbert, "null_tfidf": null_tfidf})


{'keys': 10480029, 'final_rows': 10480029, 'null_sbert': 0, 'null_tfidf': 37406}


In [0]:
# === Verify record counts across all Lab 4 tables ===
from pyspark.sql import DataFrame

gold = f"{gold_path}/features_v2"

tables = {
    "train": f"{gold}/train",
    "text_cleaned": f"{gold}/text_cleaned",
    "text_basic": f"{gold}/text_basic",
    "text_sentiment": f"{gold}/text_sentiment",
    "train_text_tfidf": f"{gold}/train_text_tfidf",
    "text_embeddings_sbert": f"{gold}/text_embeddings_sbert",
    "train_text_embeddings_sbert": f"{gold}/train_text_embeddings_sbert",  # if created
    "train_text_basic": f"{gold}/train_text_basic",                        # if created
    "train_text_sentiment": f"{gold}/train_text_sentiment",                # if created
    "train_features_v2_all": f"{gold}/train_features_v2_all"               # final merged
}

expected = 10_442_623
results = {}

for name, path in tables.items():
    try:
        df = spark.read.format("delta").load(path)
        n = df.select("review_id").distinct().count()
        results[name] = n
    except Exception as e:
        results[name] = f"⚠️ Not found or unreadable ({str(e).splitlines()[0]})"

print("=== Record count verification ===")
for k,v in results.items():
    if isinstance(v, int):
        print(f"{k:30s} → {v:,} {'✅' if v == expected else '❌'}")
    else:
        print(f"{k:30s} → {v}")

# Optional summary: highlight mismatches
mismatch = {k:v for k,v in results.items() if isinstance(v,int) and v != expected}
print("\nMismatched tables:", mismatch if mismatch else "None (all good)")

=== Record count verification ===
train                          → 10,480,029 ❌
text_cleaned                   → 10,442,623 ✅
text_basic                     → 10,442,623 ✅
text_sentiment                 → 10,442,623 ✅
train_text_tfidf               → 10,442,623 ✅
text_embeddings_sbert          → 10,480,029 ❌
train_text_embeddings_sbert    → 10,480,029 ❌
train_text_basic               → 10,442,623 ✅
train_text_sentiment           → 10,442,623 ✅
train_features_v2_all          → 10,480,029 ❌

Mismatched tables: {'train': 10480029, 'text_embeddings_sbert': 10480029, 'train_text_embeddings_sbert': 10480029, 'train_features_v2_all': 10480029}


In [0]:
base = f"{gold_path}/features_v2"

cleaned_ids = (spark.read.format("delta").load(f"{base}/text_cleaned")
               .select("review_id").distinct())
train_ids   = (spark.read.format("delta").load(f"{base}/train")
               .select("review_id").distinct())

train_clean_ids = train_ids.join(cleaned_ids, "review_id", "inner").distinct()
train_clean_ids.write.mode("overwrite").format("delta").save(f"{base}/train_clean_ids")

print("train_clean_ids:", train_clean_ids.count())  # expect 10,442,623


train_clean_ids: 10442623


In [0]:
emb_all   = spark.read.format("delta").load(f"{base}/text_embeddings_sbert") \
                      .select("review_id","bert_embedding")
train_emb = train_clean_ids.join(emb_all, "review_id", "inner")

train_emb.write.mode("overwrite").format("delta").save(f"{base}/train_text_embeddings_sbert_clean")

# verify
print("train_text_embeddings_sbert_clean:",
      spark.read.format("delta").load(f"{base}/train_text_embeddings_sbert_clean")
           .select("review_id").distinct().count())


train_text_embeddings_sbert_clean: 10442623


In [0]:
keys   = train_clean_ids.alias("k")

tfidf  = spark.read.format("delta").load(f"{base}/train_text_tfidf") \
                   .select("review_id","tfidf")
sbert  = spark.read.format("delta").load(f"{base}/train_text_embeddings_sbert_clean") \
                   .select("review_id","bert_embedding")
basic  = spark.read.format("delta").load(f"{base}/train_text_basic") \
                   .select("review_id","review_length_words","review_length_chars")
sent   = spark.read.format("delta").load(f"{base}/train_text_sentiment") \
                   .select("review_id","sentiment_pos","sentiment_neu","sentiment_neg","sentiment_compound")

final = (keys
         .join(tfidf, "review_id", "left")
         .join(sbert, "review_id", "left")
         .join(basic, "review_id", "left")
         .join(sent,  "review_id", "left"))

final.write.mode("overwrite").format("delta").save(f"{base}/train_features_v2_all")

# verify = 10,442,623
df = spark.read.format("delta").load(f"{base}/train_features_v2_all")
print({"rows": df.count(),
       "distinct_review_ids": df.select("review_id").distinct().count()})


{'rows': 10442623, 'distinct_review_ids': 10442623}


In [0]:
# === Verify counts (cleaned-train universe) ===
from pyspark.sql import functions as F

gold = f"{gold_path}/features_v2"

tables_cleaned = {
    "train_clean_ids":               f"{gold}/train_clean_ids",                  # created earlier
    "text_cleaned":                  f"{gold}/text_cleaned",
    "train_text_basic":              f"{gold}/train_text_basic",
    "train_text_sentiment":          f"{gold}/train_text_sentiment",
    "train_text_tfidf":              f"{gold}/train_text_tfidf",
    "train_text_embeddings_sbert":   f"{gold}/train_text_embeddings_sbert_clean",  # filtered SBERT
    "train_features_v2_all":         f"{gold}/train_features_v2_all"
}

expected = 10_442_623
results = {}

for name, path in tables_cleaned.items():
    try:
        df = spark.read.format("delta").load(path)
        n = df.select("review_id").distinct().count()
        results[name] = n
    except Exception as e:
        results[name] = f"⚠️ Not found or unreadable ({str(e).splitlines()[0]})"

print("=== Cleaned-train verification (expected = 10,442,623) ===")
for k, v in results.items():
    if isinstance(v, int):
        print(f"{k:35s} → {v:,} {'✅' if v == expected else '❌'}")
    else:
        print(f"{k:35s} → {v}")

mismatch = {k:v for k,v in results.items() if isinstance(v,int) and v != expected}
print("\nMismatched (cleaned-train):", mismatch if mismatch else "None (all good)")

# --- Optional: report raw-train and unfiltered SBERT for context ---
try:
    raw_train = spark.read.format("delta").load(f"{gold}/train").select("review_id").distinct().count()
    print("\nRaw train (pre-clean) →", f"{raw_train:,}")
except:
    pass

try:
    sbert_all = spark.read.format("delta").load(f"{gold}/text_embeddings_sbert") \
                          .select("review_id").distinct().count()
    print("Unfiltered SBERT table →", f"{sbert_all:,}")
except:
    pass


=== Cleaned-train verification (expected = 10,442,623) ===
train_clean_ids                     → 10,442,623 ✅
text_cleaned                        → 10,442,623 ✅
train_text_basic                    → 10,442,623 ✅
train_text_sentiment                → 10,442,623 ✅
train_text_tfidf                    → 10,442,623 ✅
train_text_embeddings_sbert         → 10,442,623 ✅
train_features_v2_all               → 10,442,623 ✅

Mismatched (cleaned-train): None (all good)

Raw train (pre-clean) → 10,480,029
Unfiltered SBERT table → 10,480,029


### **III.4.e. Additional features**

# **IV. Combined Feature Set and Output**

### **final check for all tables before combining**

In [0]:
# === Final verification of all Lab 4 train tables (pre-join) ===
from pyspark.sql import functions as F
from pyspark.ml.linalg import VectorUDT

base = f"{gold_path}/features_v2"

tables = {
    "train_clean_ids":              f"{base}/train_clean_ids",
    "text_cleaned":                 f"{base}/text_cleaned",
    "text_basic":                   f"{base}/text_basic",
    "text_sentiment":               f"{base}/text_sentiment",
    "train_text_tfidf":             f"{base}/train_text_tfidf",
    "train_text_embeddings_sbert_clean": f"{base}/train_text_embeddings_sbert_clean"
}

expected = 10_442_623
results = {}

for name, path in tables.items():
    try:
        df = spark.read.format("delta").load(path)
        n = df.select("review_id").distinct().count()
        results[name] = n
    except Exception as e:
        results[name] = f"⚠️ Missing or unreadable ({str(e).splitlines()[0]})"

print("=== Record count verification (should all equal 10,442,623) ===")
for k, v in results.items():
    if isinstance(v, int):
        print(f"{k:40s} → {v:,} {'✅' if v == expected else '❌'}")
    else:
        print(f"{k:40s} → {v}")

# Optional: sanity type check for vectorized features
for name in ["train_text_tfidf", "train_text_embeddings_sbert_clean"]:
    try:
        df = spark.read.format("delta").load(tables[name])
        col = "tfidf" if "tfidf" in name else "bert_embedding"
        print(f"{name}: VectorUDT? →", isinstance(df.schema[col].dataType, VectorUDT))
    except Exception:
        pass

=== Record count verification (should all equal 10,442,623) ===
train_clean_ids                          → 10,442,623 ✅
text_cleaned                             → 10,442,623 ✅
text_basic                               → 10,442,623 ✅
text_sentiment                           → 10,442,623 ✅
train_text_tfidf                         → 10,442,623 ✅
train_text_embeddings_sbert_clean        → 10,442,623 ✅
train_text_tfidf: VectorUDT? → True
train_text_embeddings_sbert_clean: VectorUDT? → False


In [0]:
# === Verify SBERT VectorUDT column (dimension, nulls, L2) ===
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.linalg import SparseVector, DenseVector

src = f"{gold_path}/features_v2/train_text_embeddings_sbert_clean"
emb = spark.read.format("delta").load(src).select("review_id","bert_embedding")

# Try fast path (vector_to_array). If unavailable, fall back to UDFs.
use_fast = True
try:
    from pyspark.ml.functions import vector_to_array  # Spark 3.4+ / Databricks
except Exception:
    use_fast = False

if use_fast:
    # Dimension
    (emb.select(F.size(vector_to_array("bert_embedding")).alias("dim"))
        .groupBy("dim").count().show())
    # Nulls
    print("null embeddings:", emb.where(F.col("bert_embedding").isNull()).count())
    # L2 norm (approx 1.0 if normalized)
    (emb.select(
         F.sqrt(F.aggregate(vector_to_array("bert_embedding"), F.lit(0.0), lambda acc, x: acc + x*x)).alias("l2")
     ).summary("count","min","max","mean","stddev").show())
else:
    # Fallback UDFs
    @F.udf(IntegerType())
    def vec_dim(v):
        if v is None: return None
        return int(v.size)

    @F.udf(DoubleType())
    def vec_l2(v):
        if v is None: return None
        if isinstance(v, SparseVector):
            s = sum(x*x for x in v.values)
        else:
            s = sum(x*x for x in v.toArray())
        return float(s ** 0.5)

    (emb.select(vec_dim("bert_embedding").alias("dim"))
        .groupBy("dim").count().show())
    print("null embeddings:", emb.where(F.col("bert_embedding").isNull()).count())
    (emb.select(vec_l2("bert_embedding").alias("l2"))
        .summary("count","min","max","mean","stddev").show())

+---+--------+
|dim|   count|
+---+--------+
|384|10442623|
+---+--------+

null embeddings: 0
+-------+--------------------+
|summary|                  l2|
+-------+--------------------+
|  count|            10442623|
|    min|  0.9999998469899147|
|    max|  1.0000001684440933|
|   mean|  1.0000000285194255|
| stddev|3.532389103847259E-8|
+-------+--------------------+



In [0]:
# === Final verification of all Lab 4 train tables (pre-join) ===
from pyspark.sql import functions as F
from pyspark.ml.linalg import VectorUDT

base = f"{gold_path}/features_v2"

tables = {
    "train_clean_ids":              f"{base}/train_clean_ids",
    "text_cleaned":                 f"{base}/text_cleaned",
    "text_basic":                   f"{base}/text_basic",
    "text_sentiment":               f"{base}/text_sentiment",
    "train_text_tfidf":             f"{base}/train_text_tfidf",
    "train_text_embeddings_sbert_clean": f"{base}/train_text_embeddings_sbert_clean"
}

expected = 10_442_623
results = {}

for name, path in tables.items():
    try:
        df = spark.read.format("delta").load(path)
        n = df.select("review_id").distinct().count()
        results[name] = n
    except Exception as e:
        results[name] = f"⚠️ Missing or unreadable ({str(e).splitlines()[0]})"

print("=== Record count verification (should all equal 10,442,623) ===")
for k, v in results.items():
    if isinstance(v, int):
        print(f"{k:40s} → {v:,} {'✅' if v == expected else '❌'}")
    else:
        print(f"{k:40s} → {v}")

# Optional: sanity type check for vectorized features
for name in ["train_text_tfidf", "train_text_embeddings_sbert_clean"]:
    try:
        df = spark.read.format("delta").load(tables[name])
        col = "tfidf" if "tfidf" in name else "bert_embedding"
        print(f"{name}: VectorUDT? →", isinstance(df.schema[col].dataType, VectorUDT))
    except Exception:
        pass

=== Record count verification (should all equal 10,442,623) ===
train_clean_ids                          → 10,442,623 ✅
text_cleaned                             → 10,442,623 ✅
text_basic                               → 10,442,623 ✅
text_sentiment                           → 10,442,623 ✅
train_text_tfidf                         → 10,442,623 ✅
train_text_embeddings_sbert_clean        → 10,442,623 ✅
train_text_tfidf: VectorUDT? → True
train_text_embeddings_sbert_clean: VectorUDT? → True


In [0]:
# === V. COMBINED FEATURE SET (TRAIN-ONLY) ===
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.ml.linalg import VectorUDT
from pyspark.ml.feature import VectorAssembler

base = f"{gold_path}/features_v2"

# 0) Canonical ID set (cleaned train)
ids = spark.read.format("delta").load(f"{base}/train_clean_ids").select("review_id").distinct()
expected = ids.count()

# 1) Metadata (review_id, book_id, rating) from raw train, restricted to cleaned IDs
train_keys = (spark.read.format("delta").load(f"{base}/train")
              .select("review_id","book_id","rating")
              .join(ids, "review_id", "inner"))

# 2) Feature sources (all already verified to be 10,442,623 and VectorUDT where needed)
tfidf  = spark.read.format("delta").load(f"{base}/train_text_tfidf") \
                   .select("review_id","tfidf")
sbert  = spark.read.format("delta").load(f"{base}/train_text_embeddings_sbert_clean") \
                   .select("review_id","bert_embedding")
basic  = spark.read.format("delta").load(f"{base}/text_basic") \
                   .select("review_id","review_length_words","review_length_chars") \
                   .join(ids, "review_id", "inner")
sent   = spark.read.format("delta").load(f"{base}/text_sentiment") \
                   .select("review_id","sentiment_pos","sentiment_neu","sentiment_neg","sentiment_compound") \
                   .join(ids, "review_id", "inner")

# 3) Join → wide table
wide = (train_keys
        .join(tfidf, "review_id", "inner")
        .join(sbert, "review_id", "inner")
        .join(basic, "review_id", "inner")
        .join(sent,  "review_id", "inner"))

# 4) Assertions before assemble
n_rows = wide.select("review_id").distinct().count()
null_checks = {
    "tfidf_nulls":  wide.where(F.col("tfidf").isNull()).count(),
    "bert_nulls":   wide.where(F.col("bert_embedding").isNull()).count(),
    "lenw_nulls":   wide.where(F.col("review_length_words").isNull()).count(),
    "lenc_nulls":   wide.where(F.col("review_length_chars").isNull()).count(),
    "sent_nulls":   wide.where(F.col("sentiment_pos").isNull() |
                               F.col("sentiment_neu").isNull() |
                               F.col("sentiment_neg").isNull() |
                               F.col("sentiment_compound").isNull()).count()
}
print({"expected_ids": expected, "wide_rows": n_rows, **null_checks})
assert n_rows == expected and all(v == 0 for v in null_checks.values()), "Pre-assemble check failed."

# 5) Assemble one feature vector (sparse, no densification)
assembler = VectorAssembler(
    inputCols=[
        "review_length_words", "review_length_chars",
        "sentiment_pos", "sentiment_neu", "sentiment_neg", "sentiment_compound",
        "tfidf", "bert_embedding"
    ],
    outputCol="features"
)
assembled = assembler.transform(wide).select(
    "review_id","book_id","rating",  # metadata/label
    "review_length_words","review_length_chars",
    "sentiment_pos","sentiment_neu","sentiment_neg","sentiment_compound",
    "tfidf","bert_embedding","features"
)

# 6) Dimension sanity (no densify)
@F.udf(IntegerType())
def vec_dim(v): 
    return int(v.size) if v is not None else None

dims = (assembled
        .select(vec_dim("tfidf").alias("tf_dim"),
                vec_dim("bert_embedding").alias("bert_dim"),
                vec_dim("features").alias("feat_dim"))
        .limit(1)
        .collect()[0])
print({"tf_dim": dims["tf_dim"], "bert_dim": dims["bert_dim"], "features_dim": dims["feat_dim"]})
# expected features_dim = 6 (numerics) + tf_dim + bert_dim
assert dims["feat_dim"] == 6 + dims["tf_dim"] + dims["bert_dim"], "Assembled vector dimension mismatch."

# 7) Write outputs
out_wide   = f"{base}/train_features_v2_all"        # wide with separate columns
out_matrix = f"{base}/train_features_v2_matrix"     # single assembled vector

(assembled
 .repartition(128)
 .write.mode("overwrite").option("overwriteSchema","true").format("delta").save(out_wide))

(assembled
 .select("review_id","book_id","rating","features")
 .repartition(128)
 .write.mode("overwrite").option("overwriteSchema","true").format("delta").save(out_matrix))

# 8) Final verification
final_wide   = spark.read.format("delta").load(out_wide)
final_matrix = spark.read.format("delta").load(out_matrix)

print({
    "final_wide_rows":   final_wide.select("review_id").distinct().count(),
    "final_matrix_rows": final_matrix.select("review_id").distinct().count(),
    "features_is_vector": isinstance(final_matrix.schema["features"].dataType, VectorUDT)
})

{'expected_ids': 10442623, 'wide_rows': 10442623, 'tfidf_nulls': 0, 'bert_nulls': 0, 'lenw_nulls': 0, 'lenc_nulls': 0, 'sent_nulls': 0}
{'tf_dim': 200000, 'bert_dim': 384, 'features_dim': 200390}
{'final_wide_rows': 10442623, 'final_matrix_rows': 10442623, 'features_is_vector': True}


In [0]:
# === Load combined train features and run essential checks ===
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.linalg import VectorUDT, SparseVector, DenseVector

base = f"{gold_path}/features_v2"
wide_path   = f"{base}/train_features_v2_all"
matrix_path = f"{base}/train_features_v2_matrix"

# 0) Expected ID universe (cleaned-train)
expected_ids = spark.read.format("delta").load(f"{base}/train_clean_ids").select("review_id").distinct()
expected = expected_ids.count()

# 1) Load outputs
wide   = spark.read.format("delta").load(wide_path)
matrix = spark.read.format("delta").load(matrix_path)

# 2) Row parity & schema basics
wide_rows   = wide.select("review_id").distinct().count()
matrix_rows = matrix.select("review_id").distinct().count()

print({
    "expected_ids": expected,
    "wide_rows": wide_rows,
    "matrix_rows": matrix_rows,
    "wide_has_cols": all(c in wide.columns for c in [
        "review_id","book_id","rating",
        "review_length_words","review_length_chars",
        "sentiment_pos","sentiment_neu","sentiment_neg","sentiment_compound",
        "tfidf","bert_embedding","features"
    ]),
    "matrix_has_cols": all(c in matrix.columns for c in ["review_id","book_id","rating","features"]),
    "features_is_vector": isinstance(matrix.schema["features"].dataType, VectorUDT),
    "tfidf_is_vector": isinstance(wide.schema["tfidf"].dataType, VectorUDT),
    "bert_is_vector": isinstance(wide.schema["bert_embedding"].dataType, VectorUDT),
})
assert wide_rows == expected and matrix_rows == expected, "Row mismatch vs expected cleaned-train IDs."
assert isinstance(matrix.schema["features"].dataType, VectorUDT), "features must be VectorUDT."
assert isinstance(wide.schema["tfidf"].dataType, VectorUDT), "tfidf must be VectorUDT."
assert isinstance(wide.schema["bert_embedding"].dataType, VectorUDT), "bert_embedding must be VectorUDT."

# 3) Null/NaN checks (no leakage)
num_nulls = {
    "tfidf_nulls":  wide.where(F.col("tfidf").isNull()).count(),
    "bert_nulls":   wide.where(F.col("bert_embedding").isNull()).count(),
    "features_nulls": matrix.where(F.col("features").isNull()).count(),
    "rating_nulls": wide.where(F.col("rating").isNull()).count(),
}
print(num_nulls)
assert all(v == 0 for v in num_nulls.values()), "Nulls detected in critical columns."

# 4) Vector dimensions and assembled dimension check
try:
    from pyspark.ml.functions import vector_to_array  # Spark 3.4+/DBR 13+
    tf_dim   = wide.select(F.size(vector_to_array("tfidf")).alias("d")).first()["d"]
    bert_dim = wide.select(F.size(vector_to_array("bert_embedding")).alias("d")).first()["d"]
    feat_dim = matrix.select(F.size(vector_to_array("features")).alias("d")).first()["d"]
except Exception:
    @F.udf(IntegerType())
    def vec_dim(v):
        return int(v.size) if v is not None else None
    tf_dim   = wide.select(vec_dim("tfidf").alias("d")).first()["d"]
    bert_dim = wide.select(vec_dim("bert_embedding").alias("d")).first()["d"]
    feat_dim = matrix.select(vec_dim("features").alias("d")).first()["d"]

print({"tf_dim": tf_dim, "bert_dim": bert_dim, "features_dim": feat_dim})
# 6 scalar numerics: 2 lengths + 4 sentiments
assert feat_dim == (6 + tf_dim + bert_dim), "Assembled features dimension mismatch."

# 5) Sparsity & normalization sanity (cheap; no densify)
@F.udf(IntegerType())
def nnz(v):
    if v is None: return 0
    if isinstance(v, SparseVector): return int(len(v.indices))
    # Dense fallback
    return int(sum(1 for x in v if x != 0.0))

tf_nnz = (wide.select(nnz("tfidf").alias("nnz"))
               .summary("count","min","max","mean","stddev").toPandas())
print("TF-IDF nnz summary:\n", tf_nnz)

@F.udf(DoubleType())
def vec_l2(v):
    if v is None: return None
    if isinstance(v, SparseVector):
        s = sum(x*x for x in v.values)
    else:
        s = sum(float(x)*float(x) for x in v.toArray())
    return float(s ** 0.5)

bert_l2 = (wide.select(vec_l2("bert_embedding").alias("l2"))
                .summary("count","min","max","mean","stddev").toPandas())
print("SBERT l2 summary (~1.0 if normalized):\n", bert_l2)

# 6) Label sanity (distribution of rating)
label_dist = (wide.groupBy("rating").count().orderBy("rating"))
label_dist.show(10, truncate=False)

# 7) Cross-table ID equality (wide ↔ matrix)
eq = (wide.select("review_id").distinct()
           .join(matrix.select("review_id").distinct(), "review_id", "inner").count())
print({"id_intersection": eq, "ok": eq == expected})

# 8) Tiny peek
wide.select("review_id","book_id","rating","review_length_words","tfidf","bert_embedding").limit(1).show(truncate=False)
matrix.select("review_id","features").limit(1).show(truncate=False)

print("✅ Combined feature tables loaded and validated.")

{'expected_ids': 10442623, 'wide_rows': 10442623, 'matrix_rows': 10442623, 'wide_has_cols': True, 'matrix_has_cols': True, 'features_is_vector': True, 'tfidf_is_vector': True, 'bert_is_vector': True}
{'tfidf_nulls': 0, 'bert_nulls': 0, 'features_nulls': 0, 'rating_nulls': 0}
{'tf_dim': 200000, 'bert_dim': 384, 'features_dim': 200390}
TF-IDF nnz summary:
   summary                nnz
0   count           10442623
1     min                  0
2     max               2975
3    mean  160.0799581675983
4  stddev  185.8400216649324
SBERT l2 summary (~1.0 if normalized):
   summary                     l2
0   count               10442623
1     min     0.9999998469899155
2     max     1.0000001684440936
3    mean     1.0000000285194253
4  stddev  3.5323891041696534E-8
+------+-------+
|rating|count  |
+------+-------+
|1     |306945 |
|2     |761063 |
|3     |2135260|
|4     |3613283|
|5     |3626072|
+------+-------+

{'id_intersection': 10442623, 'ok': True}
+--------------------------------+-

In [0]:
# === Show all columns and confirm completeness of combined feature table ===
combined_path = f"{gold_path}/features_v2/train_features_v2_all"
df = spark.read.format("delta").load(combined_path)

# 1) Show schema (all columns, data types)
df.printSchema()

# 2) List columns explicitly
cols = df.columns
print("Total columns:", len(cols))
print("All columns:\n", cols)

# 3) Check for all required core feature columns
expected_cols = [
    "review_id","book_id","rating",
    "review_length_words","review_length_chars",
    "sentiment_pos","sentiment_neu","sentiment_neg","sentiment_compound",
    "tfidf","bert_embedding","features"
]

missing = [c for c in expected_cols if c not in cols]
extra   = [c for c in cols if c not in expected_cols]

print("\n=== Column completeness check ===")
print("Missing columns:", missing if missing else "None ✅")
print("Extra columns:", extra if extra else "None ✅")

# 4) Quick row + null verification
row_count = df.count()
nulls = {c: df.where(F.col(c).isNull()).count() for c in expected_cols if c in df.columns}

print("\n=== Summary ===")
print(f"Row count → {row_count:,}")
print("Null counts in critical columns:", nulls)

# 5) Optional: quick sample to visually confirm structure
df.select("review_id","book_id","rating","tfidf","bert_embedding","features").show(2, truncate=False)


root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- review_length_chars: integer (nullable = true)
 |-- sentiment_pos: double (nullable = true)
 |-- sentiment_neu: double (nullable = true)
 |-- sentiment_neg: double (nullable = true)
 |-- sentiment_compound: double (nullable = true)
 |-- tfidf: vector (nullable = true)
 |-- bert_embedding: vector (nullable = true)
 |-- features: vector (nullable = true)

Total columns: 12
All columns:
 ['review_id', 'book_id', 'rating', 'review_length_words', 'review_length_chars', 'sentiment_pos', 'sentiment_neu', 'sentiment_neg', 'sentiment_compound', 'tfidf', 'bert_embedding', 'features']

=== Column completeness check ===
Missing columns: None ✅
Extra columns: None ✅

=== Summary ===
Row count → 10,442,623
Null counts in critical columns: {'review_id': 0, 'book_id': 0, 'rating': 0, 'review_length_words': 0, 'review_length