In [0]:
spark.conf.set(
    "fs.azure.account.key.goodreadsreviews60103737.dfs.core.windows.net",
    "9QM9wDRIg06Z0Udsjzr40x7DPmmPA0sjcarGLXFCxGuuBc/a1AbneCE0g5GXbbnWvLDSSM3Z2jYT+AStO3eiSA=="
)

In [0]:
# Load the curated dataset from Gold (curated_reviews Delta table)
curated_reviews = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60103737.dfs.core.windows.net/gold/curated_reviews/"
)

curated_reviews.show(5, truncate=False)
curated_reviews.printSchema()


+--------------------------------+--------+---------------------------+--------------------------------+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql.functions import col, trim, lower, length, regexp_replace

# Start cleaning
df = curated_reviews

# 1️⃣ Remove rows with missing critical fields
df = df.filter(
    col("rating").isNotNull() &
    col("review_text").isNotNull() &
    col("book_id").isNotNull() &
    col("review_id").isNotNull() &
    col("user_id").isNotNull()
)

# 2️⃣ Remove duplicates (either by review_id or same user-book combo)
df = df.dropDuplicates(["review_id", "user_id", "book_id"])

# 3️⃣ Normalize text fields (trim spaces, remove non-ASCII, lowercase review text)
text_columns = ["title", "review_text"]

for colname in text_columns:
    df = df.withColumn(colname, trim(col(colname)))  # remove leading/trailing spaces
    df = df.withColumn(colname, regexp_replace(col(colname), "[^\\x00-\\x7F]", ""))  # remove malformed/non-ASCII chars

# Convert review_text to lowercase
df = df.withColumn("review_text", lower(col("review_text")))

# 4️⃣ Drop very short reviews (<10 characters)
df = df.filter(length(col("review_text")) >= 10)

# 5️⃣ Correct data types
df = df.withColumn("rating", col("rating").cast("int"))
df = df.withColumn("book_id", col("book_id").cast("string"))
df = df.withColumn("user_id", col("user_id").cast("string"))
df = df.withColumn("review_id", col("review_id").cast("string"))
df = df.withColumn("title", col("title").cast("string"))

# ✅ Verify results
df.printSchema()
df.show(5, truncate=False)


root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)

+--------------------------------+--------+------------------------------------+--------------------------------+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql.functions import avg, count, round as rnd, size, split, col

# 1️⃣ Compute review length in words
df = df.withColumn("review_length_words", size(split(col("review_text"), " ")))

# 2️⃣ Aggregate features by book_id
book_features = (
    df.groupBy("book_id")
      .agg(
          rnd(avg("rating"), 2).alias("avg_rating_per_book"),
          count("review_id").alias("review_count_per_book")
      )
)

# 3️⃣ Aggregate features by user_id (instead of author_id)
user_features = (
    df.groupBy("user_id")
      .agg(
          rnd(avg("rating"), 2).alias("avg_rating_per_user"),
          count("review_id").alias("review_count_per_user")
      )
)

# 4️⃣ Join the features back into main DataFrame
df_features = (
    df.join(book_features, "book_id", "left")
      .join(user_features, "user_id", "left")
)

# ✅ Verify the enriched data
df_features.select(
    "book_id", "user_id", "rating", "review_length_words",
    "avg_rating_per_book", "review_count_per_book",
    "avg_rating_per_user", "review_count_per_user"
).show(5, truncate=False)


+--------+--------------------------------+------+-------------------+-------------------+---------------------+-------------------+---------------------+
|book_id |user_id                         |rating|review_length_words|avg_rating_per_book|review_count_per_book|avg_rating_per_user|review_count_per_user|
+--------+--------------------------------+------+-------------------+-------------------+---------------------+-------------------+---------------------+
|30283662|ee28a2479239420b6531b5a4c5e06155|5     |4                  |4.4                |269                  |4.11               |595                  |
|13493463|c1fce9393052ede7b42cfe9daf2b8fbf|3     |130                |3.42               |438                  |3.72               |243                  |
|18338887|07675e08d7f974f74f44dc7af1290f25|2     |61                 |3.5                |2                    |3.46               |13                   |
|19039662|1983c753e61f3a5d122c828bae1cc43d|4     |14                 |

In [0]:
# Save to the Gold layer as a new Delta dataset
df_features.write.format("delta").mode("overwrite").save(
    "abfss://lakehouse@goodreadsreviews60103737.dfs.core.windows.net/gold/features_v1/"
)


In [0]:
# Reload the saved dataset
features_v1 = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60103737.dfs.core.windows.net/gold/features_v1/"
)

# Inspect structure and records
features_v1.printSchema()
print(f"Total records: {features_v1.count()}")
features_v1.show(5, truncate=False)


root
 |-- user_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- avg_rating_per_book: double (nullable = true)
 |-- review_count_per_book: long (nullable = true)
 |-- avg_rating_per_user: double (nullable = true)
 |-- review_count_per_user: long (nullable = true)

Total records: 14971371
+--------------------------------+--------+--------------------------------+----------------------------------------------+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------