In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, trim, lower, length, split, size, avg, count
)


In [0]:
spark = SparkSession.builder.appName("GoldFeatures").getOrCreate()

In [0]:
spark.conf.set(
    "fs.azure.account.key.goodreadsreviews60104281.dfs.core.windows.net",
    "9RU1c3jp0LFpRyz+fXUyQc4pc+RCoF+DolY45sCFCRkPKM4V4ucxzeXZMM247jLYqEkCXCfGxj6k+AStanhkHA=="
)

In [0]:
print(" Listing containers in your lakehouse:")
display(dbutils.fs.ls("abfss://lakehouse@goodreadsreviews60104281.dfs.core.windows.net/"))


 Listing containers in your lakehouse:


path,name,size,modificationTime
abfss://lakehouse@goodreadsreviews60104281.dfs.core.windows.net/gold/,gold/,0,1762162055000
abfss://lakehouse@goodreadsreviews60104281.dfs.core.windows.net/processed/,processed/,0,1762105788000
abfss://lakehouse@goodreadsreviews60104281.dfs.core.windows.net/raw/,raw/,0,1762086513000


In [0]:
curated_path = "abfss://lakehouse@goodreadsreviews60104281.dfs.core.windows.net/gold/curated_reviews"


curated = spark.read.format("delta").load(curated_path)

print("Loaded curated dataset successfully!")
curated.printSchema()
curated.show(5, truncate=False)

Loaded curated dataset successfully!
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language: string (nullable = true)
 |-- n_votes: long (nullable = true)
 |-- date_added: string (nullable = true)

+--------------------------------+--------+----------------------------------------------------------+---------+-------------------+--------------------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
cleaned = curated.filter(
    col("rating").isNotNull() &
    col("review_text").isNotNull() &
    col("book_id").isNotNull() &
    col("author_id").isNotNull()
)

# Remove duplicates (keep first occurrence)
cleaned = cleaned.dropDuplicates(["review_id"])

# Normalize and trim text fields
cleaned = cleaned.withColumn("review_text", trim(lower(col("review_text"))))
cleaned = cleaned.withColumn("title", trim(col("title")))
cleaned = cleaned.withColumn("name", trim(col("name")))

# Remove very short reviews (<10 characters)
cleaned = cleaned.filter(length(col("review_text")) >= 10)

# Fix column data types
cleaned = (
    cleaned.withColumn("rating", col("rating").cast("int"))
           .withColumn("book_id", col("book_id").cast("string"))
           .withColumn("author_id", col("author_id").cast("string"))
           .withColumn("user_id", col("user_id").cast("string"))
           .withColumn("n_votes", col("n_votes").cast("int"))
)

In [0]:
print(" Data cleaning complete.")
cleaned.printSchema()

 Data cleaning complete.
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language: string (nullable = true)
 |-- n_votes: integer (nullable = true)
 |-- date_added: string (nullable = true)



In [0]:
# Word count for each review
cleaned = cleaned.withColumn("review_length", size(split(col("review_text"), " ")))

# Aggregate by book_id for average rating and review count
book_features = cleaned.groupBy("book_id").agg(
    avg("rating").alias("avg_rating_per_book"),
    count("*").alias("num_reviews_per_book")
)

# Join features back to main dataset
features = cleaned.join(book_features, on="book_id", how="left")

print("Feature generation complete.")
features.select("book_id", "avg_rating_per_book", "num_reviews_per_book").show(5, truncate=False)

Feature generation complete.
+--------+-------------------+--------------------+
|book_id |avg_rating_per_book|num_reviews_per_book|
+--------+-------------------+--------------------+
|28204534|3.8403805496828753 |946                 |
|23295358|4.857142857142857  |119                 |
|3047848 |3.878504672897196  |214                 |
|27221955|5.0                |4                   |
|31592846|4.0                |1                   |
+--------+-------------------+--------------------+
only showing top 5 rows


In [0]:
# 7 Save Cleaned + Enriched Dataset to Gold Zone
gold_path = "abfss://lakehouse@goodreadsreviews60104281.dfs.core.windows.net/gold/features_v1"

features.write.format("delta").mode("overwrite").save(gold_path)
print(f"✅ Saved final dataset to: {gold_path}")

✅ Saved final dataset to: abfss://lakehouse@goodreadsreviews60104281.dfs.core.windows.net/gold/features_v1


In [0]:
# Verify Saved Gold Table

verify = spark.read.format("delta").load(gold_path)

print(" Verification:")
verify.printSchema()
print(f"Total rows: {verify.count()}")
verify.show(5, truncate=False)

 Verification:
root
 |-- book_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language: string (nullable = true)
 |-- n_votes: integer (nullable = true)
 |-- date_added: string (nullable = true)
 |-- review_length: integer (nullable = true)
 |-- avg_rating_per_book: double (nullable = true)
 |-- num_reviews_per_book: long (nullable = true)

Total rows: 14970953
+--------+--------------------------------+---------------------------------------------+---------+----------------+--------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------