In [0]:
# Replace with your actual storage account name and key 
spark.conf.set( 
    "fs.azure.account.key.goodreadsreviews60301813.dfs.core.windows.net", 
    "sI48e0EntAgjv6jZU0Rhqmk2WWR0vsfHYc/urFlpLKbOusdGyHZgpJjOG8ek5VCCktAYpYjh49PC+AStvm6tbg==" 
) 

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType

# Step 1: Reload clean_reviews (in case cache is stale)
clean_reviews = spark.read.parquet(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/reviews/"
)

# Step 2: Load other tables (make sure these exist!)
books = spark.read.parquet(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/books/"
)
authors = spark.read.parquet(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/authors/"
)
book_authors = spark.read.parquet(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/book_authors/"
)

# Step 3: Curate with homework filters
curated_reviews = (
    clean_reviews.alias("r")
    .join(book_authors.alias("ba"), "book_id", "inner")
    .join(authors.alias("a"), "author_id", "inner")
    .join(books.alias("b"), "book_id", "inner")
    # Filter conditions (as in lab instructions)
    .filter(
        (F.col("b.publication_year").isin("2014", "2015")) &
        (F.col("b.average_rating").cast(DoubleType()) > 4.0) &
        (F.col("b.ratings_count").cast(IntegerType()) >= 100)
    )
    .select(
        F.col("r.review_id"),
        F.col("r.book_id"),
        F.col("b.title"),
        F.col("a.author_id"),
        F.col("a.name").alias("author_name"),
        F.col("r.user_id"),
        F.col("r.rating"),
        F.col("r.review_text"),
        F.col("b.language_code").alias("language"),
        # Optional enrichment fields
        F.col("b.average_rating").cast(DoubleType()).alias("book_avg_rating"),
        F.col("b.ratings_count").cast(IntegerType()).alias("book_ratings_count"),
        F.length("r.review_text").alias("review_length")
    )
)

# Display verification
print("✅ Curated dataset ready for homework!")
curated_reviews.printSchema()
curated_reviews.show(10, truncate=False)
print(f"Total filtered reviews (2014-2015, high-rated books): {curated_reviews.count():,}")


✅ Curated dataset ready for homework!
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language: string (nullable = true)
 |-- book_avg_rating: double (nullable = true)
 |-- book_ratings_count: integer (nullable = true)
 |-- review_length: integer (nullable = true)

+--------------------------------+--------+-----------------------------------------+---------+----------------+--------------------------------+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
output_path = "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/curated/reviews_2014_2015_highrated/"

curated_reviews \
    .coalesce(10) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .parquet(output_path)

print(f"Saved to: {output_path}")

Saved to: abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/curated/reviews_2014_2015_highrated/


In [0]:
# Load the books dataset from the silver layer 
books = spark.read.parquet( 
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/books/" 
) 
 
# Load the authors dataset from the silver layer 
authors = spark.read.parquet( 
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/authors/" 
) 
# Display the first few records to confirm the data was loaded correctly 
books.show(5) 
authors.show(5) 
# Display the columns and their data types to verify the schema 
books.printSchema() 
authors.printSchema() 

+----------+------------------+------------+-------------+----------+--------+--------------+-----------+--------------------+---------+--------------------+--------------------+---------+---------------+-------------+-----------------+-------------------+----------------+--------------------+--------------------+-------+-------------+-------+--------------------+--------------------+
|      isbn|text_reviews_count|country_code|language_code|      asin|is_ebook|average_rating|kindle_asin|         description|   format|                link|           publisher|num_pages|publication_day|       isbn13|publication_month|edition_information|publication_year|                 url|           image_url|book_id|ratings_count|work_id|               title|title_without_series|
+----------+------------------+------------+-------------+----------+--------+--------------+-----------+--------------------+---------+--------------------+--------------------+---------+---------------+-------------+------

In [0]:
from pyspark.sql.functions import col, length, trim, count, when

# Read raw (uncleaned) reviews from the silver layer 
reviews = spark.read.parquet( 
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/reviews/" 
) 
 

# Peek at rows and schema
reviews.show(5, truncate=False)
reviews.printSchema()

# Basic profiling: count total and potential issues
total_rows = reviews.count()
null_review_id = reviews.filter(col("review_id").isNull()).count()
null_book_id = reviews.filter(col("book_id").isNull()).count()
null_user_id = reviews.filter(col("user_id").isNull()).count()
null_rating = reviews.filter(col("rating").isNull()).count()
empty_text = reviews.filter((col("review_text").isNull()) | (trim(col("review_text")) == "")).count()

print(f"Total rows: {total_rows}")
print(f"Null review_id: {null_review_id}, Null book_id: {null_book_id}, Null user_id: {null_user_id}, Null rating: {null_rating}")
print(f"Empty/Null review_text: {empty_text}")


+--------------------------------+--------+--------------------------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql.functions import col, length, trim

# Read reviews again from processed zone (silver)
reviews = spark.read.parquet("abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/reviews/")

# Drop rows missing critical keys
clean_reviews = reviews.filter(
    col("review_id").isNotNull() &
    col("book_id").isNotNull() &
    col("user_id").isNotNull() &
    col("rating").isNotNull()
)

# Drop reviews that are blank or too short
clean_reviews = clean_reviews.filter(
    trim(col("review_text")) != ""
).filter(length(col("review_text")) > 30)

# Drop any invalid ratings (outside 0–5 range)
clean_reviews = clean_reviews.filter((col("rating") >= 0) & (col("rating") <= 5))

# Show results
print("Before cleaning:", reviews.count())
print("After cleaning:", clean_reviews.count())
clean_reviews.show(5, truncate=False)


Before cleaning: 14325697
After cleaning: 14325697
+--------------------------------+--------+--------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# Write the cleaned reviews back to the silver layer (overwrite mode)
clean_reviews.write.mode("overwrite").parquet(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/reviews/"
)

# Sanity check: read from disk and inspect schema + few rows
reviews_verified = spark.read.parquet(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/reviews/"
)

reviews_verified.printSchema()
reviews_verified.show(5, truncate=False)

print("Verified cleaned rows:", reviews_verified.count())


root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)

+--------------------------------+--------+--------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql import functions as F

# Create a minimal book_authors bridge since raw file is missing
print("⚠️ No book_authors.json found — creating a temporary bridge")

# Sample small subsets to avoid long execution
sample_books = books.select("book_id").limit(1000)
sample_authors = authors.select("author_id").limit(1000)

# Create a dummy relationship (1 author per book just for joining)
book_authors = sample_books.crossJoin(sample_authors.limit(1))

# Save it to the processed zone
book_authors.write.mode("overwrite").parquet("abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/book_authors/")

print("✅ Temporary book_authors parquet created at /processed/book_authors/")


⚠️ No book_authors.json found — creating a temporary bridge
✅ Temporary book_authors parquet created at /processed/book_authors/


In [0]:
# ==========================================================
# STEP 2: Curate the Gold Table (Fully Fixed Version)
# ==========================================================

from pyspark.sql import functions as F

# 🧹 1️⃣ Clear Spark cache (avoids stale data issues)
spark.catalog.clearCache()

# 🔄 2️⃣ Reload all required datasets from ADLS Gen2
# Make sure these folders exist in your ADLS under 'processed/'
clean_reviews = spark.read.parquet(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/reviews/"
)

books = spark.read.parquet(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/books/"
)

authors = spark.read.parquet(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/authors/"
)

book_authors = spark.read.parquet(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/book_authors/"
)

# ✅ 3️⃣ Curate the Gold Table
curated_reviews = (
    clean_reviews.alias("r")
    .join(book_authors.alias("ba"), "book_id", "inner")
    .join(authors.alias("a"), "author_id", "inner")
    .join(books.alias("b"), "book_id", "inner")
    .select(
        F.col("r.review_id"),
        F.col("r.book_id"),
        F.col("b.title"),
        F.col("a.author_id"),
        F.col("a.name").alias("author_name"),
        F.col("r.user_id"),
        F.col("r.rating"),
        F.col("r.review_text"),
        # 🔤 Columns that might not exist in the current dataset:
        F.lit(None).alias("language"),      # placeholder for missing column
        F.lit(None).alias("n_votes"),       # placeholder for missing column
        F.lit(None).alias("date_added")     # placeholder for missing column
    )
)

# 🧾 4️⃣ Verify schema and preview first few rows
print("✅ Curated reviews table successfully created!")
curated_reviews.printSchema()
curated_reviews.show(5, truncate=False)

# 💾 5️⃣ Save curated dataset to Silver layer
output_path = "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/silver/curated_reviews/"

curated_reviews.write.format("delta").mode("overwrite").save(output_path)
print(f"✅ Curated dataset saved to: {output_path}")

# 🔍 6️⃣ Reload and verify saved data
curated_check = spark.read.format("delta").load(output_path)
print("✅ Verification: curated_reviews reloaded successfully!")
curated_check.printSchema()
curated_check.show(3, truncate=False)


✅ Curated reviews table successfully created!
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language: void (nullable = true)
 |-- n_votes: void (nullable = true)
 |-- date_added: void (nullable = true)

+--------------------------------+--------+---------------------------------------------------------------------+---------+----------------+--------------------------------+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# ==========================================================
# ✅ FIXED VERSION: Curate and Save Gold Table
# ==========================================================

from pyspark.sql import functions as F

# 🧹 1️⃣ Clear cache to prevent stale data problems
spark.catalog.clearCache()

# 🔄 2️⃣ Reload datasets cleanly from ADLS Gen2
clean_reviews = spark.read.parquet(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/reviews/"
)

books = spark.read.parquet(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/books/"
)

authors = spark.read.parquet(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/authors/"
)

book_authors = spark.read.parquet(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/processed/book_authors/"
)

# ✅ 3️⃣ Join everything together cleanly
# Using ba.author_id ensures we keep a valid author_id column
curated_reviews = (
    clean_reviews.alias("r")
    .join(book_authors.alias("ba"), F.col("r.book_id") == F.col("ba.book_id"), "inner")
    .join(authors.alias("a"), F.col("ba.author_id") == F.col("a.author_id"), "inner")
    .join(books.alias("b"), F.col("r.book_id") == F.col("b.book_id"), "inner")
    .select(
        F.col("r.review_id"),
        F.col("r.book_id"),
        F.col("b.title"),
        F.col("ba.author_id"),              # ✅ keep this one clean version
        F.col("a.name").alias("author_name"),
        F.col("r.user_id"),
        F.col("r.rating"),
        F.col("r.review_text"),
        F.lit(None).alias("language"),      # placeholder for missing data
        F.lit(None).alias("n_votes"),
        F.lit(None).alias("date_added")
    )
)

# 🧾 4️⃣ Verify schema and sample rows
print("✅ Curated table created successfully!")
curated_reviews.printSchema()
curated_reviews.show(5, truncate=False)

# 💾 5️⃣ Save curated DataFrame to the Gold zone
curated_reviews.coalesce(10) \
    .write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .save("abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/gold/curated_reviews/")

print("✅ Curated DataFrame successfully saved to Gold Zone!")

# 🔍 6️⃣ Reload to verify
gold_df = spark.read.format("delta").load(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/gold/curated_reviews/"
)

print("✅ Verification complete — Gold layer loaded:")
gold_df.printSchema()
gold_df.show(5, truncate=False)
print(f"✅ Total rows: {gold_df.count():,}")


✅ Curated table created successfully!
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language: void (nullable = true)
 |-- n_votes: void (nullable = true)
 |-- date_added: void (nullable = true)

+--------------------------------+--------+---------------------------------------------------------------------+---------+----------------+--------------------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# Step 4: Verify that the Gold table is correctly saved and readable
verified = spark.read.format("delta").load(
    "abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/gold/curated_reviews/"
)

print("✅ Total curated rows:", verified.count())
verified.show(5, truncate=False)


✅ Total curated rows: 7275
+--------------------------------+--------+---------------------------------------------------------------------+---------+----------------+--------------------------------+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
dbutils.fs.ls("abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/gold/curated_reviews/")


[FileInfo(path='abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/gold/curated_reviews/_delta_log/', name='_delta_log/', size=0, modificationTime=1762597087000),
 FileInfo(path='abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/gold/curated_reviews/part-00000-0a419918-42cc-4558-9959-a554c9b3ec17.c000.snappy.parquet', name='part-00000-0a419918-42cc-4558-9959-a554c9b3ec17.c000.snappy.parquet', size=1928290, modificationTime=1762883846000),
 FileInfo(path='abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/gold/curated_reviews/part-00000-897f3260-7932-474a-9e6c-effc2b12d34a.c000.snappy.parquet', name='part-00000-897f3260-7932-474a-9e6c-effc2b12d34a.c000.snappy.parquet', size=1442937, modificationTime=1762597417000),
 FileInfo(path='abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/gold/curated_reviews/part-00000-99562ddd-31fc-4140-98a4-c5faa958e30b.c000.snappy.parquet', name='part-00000-99562ddd-31fc-41

In [0]:
spark.sql("""
DROP TABLE IF EXISTS hive_metastore.default.curated_reviews
""")

spark.sql("""
CREATE TABLE hive_metastore.default.curated_reviews
USING DELTA
LOCATION 'abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/gold/curated_reviews/'
""")


DataFrame[]

In [0]:
spark.sql("""
SELECT review_id, book_id, title, author_id, author_name,
       user_id, rating, review_text
FROM hive_metastore.default.curated_reviews
LIMIT 5
""").show(truncate=False)


+--------------------------------+--------+---------------------------------------------------------------------+---------+----------------+--------------------------------+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
dbutils.fs.ls("abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/gold/curated_reviews/")


[FileInfo(path='abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/gold/curated_reviews/_delta_log/', name='_delta_log/', size=0, modificationTime=1762597087000),
 FileInfo(path='abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/gold/curated_reviews/part-00000-0a419918-42cc-4558-9959-a554c9b3ec17.c000.snappy.parquet', name='part-00000-0a419918-42cc-4558-9959-a554c9b3ec17.c000.snappy.parquet', size=1928290, modificationTime=1762883846000),
 FileInfo(path='abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/gold/curated_reviews/part-00000-897f3260-7932-474a-9e6c-effc2b12d34a.c000.snappy.parquet', name='part-00000-897f3260-7932-474a-9e6c-effc2b12d34a.c000.snappy.parquet', size=1442937, modificationTime=1762597417000),
 FileInfo(path='abfss://lakehouse60301813@goodreadsreviews60301813.dfs.core.windows.net/gold/curated_reviews/part-00000-99562ddd-31fc-4140-98a4-c5faa958e30b.c000.snappy.parquet', name='part-00000-99562ddd-31fc-41