In [0]:
spark.conf.set(
  "fs.azure.account.key.goodreadsreviews60300832.dfs.core.windows.net",
  "anotha key DJ KHALED ANOTHA ONE"
)

In [0]:
storage = "goodreadsreviews60300832"
container = "lakehouse"

silver_reviews = f"abfss://{container}@{storage}.dfs.core.windows.net/processed/reviews/"
silver_books   = f"abfss://{container}@{storage}.dfs.core.windows.net/processed/books/"
silver_authors = f"abfss://{container}@{storage}.dfs.core.windows.net/processed/authors/"
gold_features  = f"abfss://{container}@{storage}.dfs.core.windows.net/gold/features_v1/"

# sanity check
display(dbutils.fs.ls(f"abfss://{container}@{storage}.dfs.core.windows.net/"))


path,name,size,modificationTime
abfss://lakehouse@goodreadsreviews60300832.dfs.core.windows.net/gold/,gold/,0,1762246719000
abfss://lakehouse@goodreadsreviews60300832.dfs.core.windows.net/processed/,processed/,0,1762090610000
abfss://lakehouse@goodreadsreviews60300832.dfs.core.windows.net/raw/,raw/,0,1762084280000


In [0]:
books   = spark.read.parquet(silver_books)
authors = spark.read.parquet(silver_authors)
reviews = spark.read.parquet(silver_reviews)

print("row counts:", {"books": books.count(), "authors": authors.count(), "reviews": reviews.count()})
reviews.printSchema()

row counts: {'books': 3323621, 'authors': 829529, 'reviews': 14971371}
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- n_votes: long (nullable = true)
 |-- date_added: string (nullable = true)



In [0]:
# ===== 2) clean reviews =====
from pyspark.sql import functions as f

df = reviews

for c in ["review_id","book_id","user_id","rating","review_text"]:
    if c in df.columns:
        df = df.filter(f.col(c).isNotNull())

for c in ["review_id","book_id","user_id","author_id"]:
    if c in df.columns:
        df = df.withColumn(c, f.col(c).cast("string"))

df = df.withColumn("rating", f.col("rating").cast("int")) \
       .filter((f.col("rating") >= 1) & (f.col("rating") <= 5))

df = df.withColumn("review_text", f.trim(f.lower(f.col("review_text")))) \
       .filter(f.length("review_text") >= 10)


df = df.dropDuplicates(["review_id"])

# legacy time parser for Spark >= 3.0
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# parse date if text: "Tue Mar 17 13:18:31 -0700 2015"
if "date_added" in df.columns and dict(df.dtypes)["date_added"] == "string":
    df = df.withColumn(
        "date_added_ts",
        f.to_timestamp(f.col("date_added"), "EEE MMM dd HH:mm:ss Z yyyy")
    )

if "n_votes" in df.columns:
    df = df.withColumn("n_votes", f.coalesce(f.col("n_votes").cast("int"), f.lit(0)))
if "language" in df.columns:
    df = df.withColumn("language", f.coalesce(f.col("language"), f.lit("unknown")))


keep = [c for c in ["review_id","book_id","user_id","author_id","title","name",
                    "rating","review_text","language","n_votes","date_added","date_added_ts"]
        if c in df.columns]
df = df.select(*keep)

df.cache()
print("cleaned rows:", df.count())
df.show(5, truncate=False)


cleaned rows: 14971371
+--------------------------------+--------+--------------------------------+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:

df = df.withColumn("review_length",
                   f.size(f.split(f.col("review_text"), r"\s+")).cast("int"))


by_book = df.groupBy("book_id").agg(
    f.avg("rating").alias("avg_rating"),
    f.count("*").alias("num_reviews"),
    f.avg("review_length").alias("avg_review_length")
)

by_author = None
if "name" in df.columns:
    by_author = df.groupBy("name").agg(
        f.avg("rating").alias("avg_rating_author"),
        f.count("*").alias("num_reviews_author")
    )

features = df.join(by_book, on="book_id", how="left")
if by_author is not None:
    features = features.join(by_author, on="name", how="left")

features.printSchema()
features.show(5, truncate=False)


root
 |-- book_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- n_votes: integer (nullable = false)
 |-- date_added: string (nullable = true)
 |-- date_added_ts: timestamp (nullable = true)
 |-- review_length: integer (nullable = false)
 |-- avg_rating: double (nullable = true)
 |-- num_reviews: long (nullable = true)
 |-- avg_review_length: double (nullable = true)

+--------+--------------------------------+--------------------------------+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:

num_types = {"int", "bigint", "double", "float", "long", "smallint", "tinyint", "decimal"}
num_cols = [c for c, t in features.dtypes if t in num_types]

for c in num_cols:
    features = features.withColumn(
        c,
        f.when(f.isnan(f.col(c)) | f.col(c).isin(float("inf"), float("-inf")), None)
         .otherwise(f.col(c))
    )

print("final columns:", features.columns)


final columns: ['book_id', 'review_id', 'user_id', 'rating', 'review_text', 'n_votes', 'date_added', 'date_added_ts', 'review_length', 'avg_rating', 'num_reviews', 'avg_review_length']


In [0]:

features.write.format("delta").mode("overwrite").save(gold_features)


In [0]:

gold = spark.read.format("delta").load(gold_features)
print("gold rows:", gold.count())
gold.printSchema()


gold.orderBy(f.desc("num_reviews")).show(10, truncate=False)


gold rows: 14971371
root
 |-- book_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- n_votes: integer (nullable = true)
 |-- date_added: string (nullable = true)
 |-- date_added_ts: timestamp (nullable = true)
 |-- review_length: integer (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- num_reviews: long (nullable = true)
 |-- avg_review_length: double (nullable = true)

+--------+--------------------------------+--------------------------------+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------