In [10]:
# ============================================================
# SAFE META CLEANER (handles missing cols, case issues, GCS output)
# ============================================================
from pyspark.sql import functions as F
from pyspark.sql import types as T
import time

# --- GCS locations ---
GCS_BASE            = "gs://qst843-project/amazon_reviews_2023"
META_RAW_ROOT       = f"{GCS_BASE}/bronze/meta_parquet"
META_CLEAN_ROOT     = f"{GCS_BASE}/bronze/clean_data/meta"

spark.conf.set("spark.sql.caseSensitive", "true")
spark.conf.set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("spark.hadoop.google.cloud.project", "qst843-project")

# --- helpers ---
def col_if_exists(df, name, dtype="string"):
    """Return column if it exists, else NULL of desired type."""
    return F.col(name) if name in df.columns else F.lit(None).cast(dtype)

def size_if_exists(df, name):
    """Return size(col) if it exists else 0."""
    return F.size(F.col(name)) if name in df.columns else F.lit(0)

# --- main cleaner ---
def clean_existing_meta(cat, do_count=True):
    src_path  = f"{META_RAW_ROOT}/{cat}"
    dest_path = f"{META_CLEAN_ROOT}/{cat}"

    print(f"[{time.strftime('%H:%M:%S')}] START cleaning {cat}")
    df = spark.read.parquet(src_path)

    # --- guarantee parent_asin ---
    df = df.withColumn(
        "parent_asin",
        F.coalesce(col_if_exists(df, "parent_asin"), col_if_exists(df, "asin")).cast("string")
    )

    # --- product flags ---
    df = df.withColumn("product_image", (F.coalesce(size_if_exists(df, "images"), F.lit(0)) > 0)) \
           .withColumn("product_video", (F.coalesce(size_if_exists(df, "videos"), F.lit(0)) > 0))

    # --- drop large/unneeded fields if they exist ---
    for c in ["images", "videos", "details", "_corrupt_record"]:
        if c in df.columns:
            df = df.drop(c)

    # --- trim array text fields ---
    def trim_array(colname):
        if colname in df.columns:
            return F.transform(
                F.col(colname),
                lambda x: F.trim(F.regexp_replace(x.cast("string"), r"\s+", " "))
            )
        return F.lit(None).cast("array<string>")

    df = df.withColumn("features", trim_array("features")) \
           .withColumn("description", trim_array("description"))

    # --- trim scalar text fields ---
    for c in ["title", "brand", "store"]:
        if c in df.columns:
            df = df.withColumn(c, F.trim(F.regexp_replace(F.col(c).cast("string"), r"\s+", " ")))
    if "store" in df.columns:
        df = df.withColumn("store", F.lower(F.col("store")))

    # --- handle nested categories ---
    if "main_category" not in df.columns and "main_cat" in df.columns:
        df = df.withColumn("main_category", F.col("main_cat"))
    if "categories" in df.columns:
        dt = [f for f in df.schema if f.name == "categories"][0].dataType
        if isinstance(dt, T.ArrayType) and isinstance(dt.elementType, T.ArrayType):
            df = df.withColumn(
                "categories", F.expr("transform(categories, x -> array_join(x, ' > '))")
            )

    # --- parse price into double ---
    if "price" in df.columns:
        df = df.withColumn(
            "price",
            F.when(F.col("price").cast("double").isNotNull(), F.col("price").cast("double"))
             .otherwise(F.regexp_replace(F.col("price").cast("string"), r"[^0-9.\-]", "").cast("double"))
        )

    # --- reorder & ensure missing columns exist ---
    ordered_cols = [
        ("parent_asin", T.StringType()),
        ("title", T.StringType()),
        ("main_category", T.StringType()),
        ("categories", T.ArrayType(T.StringType())),
        ("price", T.DoubleType()),
        ("features", T.ArrayType(T.StringType())),
        ("description", T.ArrayType(T.StringType())),
        ("average_rating", T.DoubleType()),
        ("rating_number", T.LongType()),
        ("brand", T.StringType()),
        ("store", T.StringType()),
        ("product_image", T.BooleanType()),
        ("product_video", T.BooleanType()),
    ]

    for c, t in ordered_cols:
        if c not in df.columns:
            df = df.withColumn(c, F.lit(None).cast(t))
    df = df.select(*[F.col(c).cast(t) for c, t in ordered_cols])

    # --- write cleaned output ---
    (df.coalesce(8)
        .write.mode("overwrite")
        .option("compression", "snappy")
        .parquet(dest_path))

    if do_count:
        print(f"Total rows: {df.count():,}")
    print(f"✅ Cleaned meta written to: {dest_path}\n")

# Example usage:
# clean_existing_meta("All_Beauty")
# clean_existing_meta("Electronics")


In [28]:
# clean_existing_meta("All_Beauty")
# clean_existing_meta("Arts_Crafts_and_Sewing")
# clean_existing_meta("Automotive")
# clean_existing_meta("Baby_Products")
# clean_existing_meta("Cell_Phones_and_Accessories")
# clean_existing_meta("Grocery_and_Gourmet_Food")
# clean_existing_meta("Health_and_Personal_Care")
# clean_existing_meta("Musical_Instruments")
# clean_existing_meta("Office_Products")
# clean_existing_meta("Patio_Lawn_and_Garden")
# clean_existing_meta("Pet_Supplies")
# clean_existing_meta("Sports_and_Outdoors")
# clean_existing_meta("Toys_and_Games")

[00:49:44] START cleaning Toys_and_Games




Total rows: 890,874
✅ Cleaned meta written to: gs://qst843-project/amazon_reviews_2023/bronze/clean_data/meta/Toys_and_Games



                                                                                

In [33]:
# ============================================================
# SAFE REVIEWS CLEANER (handles missing cols, timestamp ms→ts, dedupe)
# ============================================================
from pyspark.sql import functions as F, types as T
import time

# --- GCS locations ---
GCS_BASE                 = "gs://qst843-project/amazon_reviews_2023"
REVIEWS_RAW_ROOT         = f"{GCS_BASE}/bronze/reviews_parquet_by_cat"
REVIEWS_CLEAN_ROOT       = f"{GCS_BASE}/bronze/clean_data/reviews"

spark.conf.set("spark.sql.caseSensitive", "true")
spark.conf.set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("spark.hadoop.google.cloud.project", "qst843-project")

# --- helpers ---
def col_if_exists(df, name, dtype="string"):
    return F.col(name) if name in df.columns else F.lit(None).cast(dtype)

def size_if_exists(df, name):
    return F.size(F.col(name)) if name in df.columns else F.lit(0)

def clean_existing_reviews(cat: str, do_count: bool = True):
    """
    Read raw REVIEWS parquet for a category, clean it, and write to clean_data/reviews/<cat>.
    Keeps originals under bronze/reviews_parquet_by_cat/<cat>.
    """
    src_path  = f"{REVIEWS_RAW_ROOT}/{cat}"
    dest_path = f"{REVIEWS_CLEAN_ROOT}/{cat}"

    print(f"[{time.strftime('%H:%M:%S')}] START reviews clean {cat}")
    df = spark.read.parquet(src_path)

    # --- guarantee parent_asin ---
    df = df.withColumn(
        "parent_asin",
        F.coalesce(col_if_exists(df, "parent_asin"), col_if_exists(df, "asin")).cast("string")
    )

    # --- rating, helpful_vote, verified_purchase types ---
    df = df.withColumn("rating", col_if_exists(df, "rating", "double").cast("int")) \
           .withColumn("helpful_vote", col_if_exists(df, "helpful_vote", "long").cast("int")) \
           .withColumn("verified_purchase", col_if_exists(df, "verified_purchase", "boolean").cast("boolean"))

    # --- timestamp(ms) → timestamp ---
    # Some dumps use 'timestamp' (ms), others 'unixReviewTime' (sec)
    ts_ms = col_if_exists(df, "timestamp", "long")
    ts_sec_alt = col_if_exists(df, "unixReviewTime", "long")
    df = df.withColumn(
            "_ts_sec",
            F.when(ts_ms.isNotNull(), (ts_ms.cast("double")/1000.0))
             .otherwise(ts_sec_alt.cast("double"))
        ) \
        .withColumn("timestamp",
            F.to_timestamp(F.from_unixtime(F.col("_ts_sec")))
        ).drop("_ts_sec")

    # --- text fields tidy ---
    for c in ["title","text"]:
        df = df.withColumn(
            c,
            F.trim(F.regexp_replace(col_if_exists(df, c).cast("string"), r"\s+", " "))
        )

    # --- review image flag & drop heavy arrays ---
    df = df.withColumn("review_image", (F.coalesce(size_if_exists(df, "images"), F.lit(0)) > 0))
    if "images" in df.columns:
        df = df.drop("images")

    # --- basic filters: rating in [1,5] if rating exists ---
    if "rating" in df.columns:
        df = df.where((F.col("rating") >= 1) & (F.col("rating") <= 5))

    # --- dedupe ---
    # key uses what we likely have: (user_id, parent_asin, timestamp, title)
    df = df.dropDuplicates(["user_id", "parent_asin", "timestamp", "title"])

    # --- final columns (ensure presence) ---
    ordered_cols = [
        ("user_id",          T.StringType()),
        ("parent_asin",      T.StringType()),
        ("timestamp",        T.TimestampType()),
        ("rating",           T.IntegerType()),
        ("title",            T.StringType()),
        ("text",             T.StringType()),
        ("helpful_vote",     T.IntegerType()),
        ("verified_purchase",T.BooleanType()),
        ("review_image",     T.BooleanType()),
    ]
    for c, t in ordered_cols:
        if c not in df.columns:
            df = df.withColumn(c, F.lit(None).cast(t))
    df = df.select(*[F.col(c).cast(t) for c, t in ordered_cols])

    # --- write cleaned copy (originals preserved) ---
    (df.coalesce(8)
       .write.mode("overwrite")
       .option("compression","snappy")
       .parquet(dest_path))

    if do_count:
        print(f"Total rows: {df.count():,}")
    print(f"✅ Cleaned reviews written to: {dest_path}\n")


In [47]:
# clean_existing_reviews("All_Beauty")
# clean_existing_reviews("Arts_Crafts_and_Sewing")
# clean_existing_reviews("Automotive")
# clean_existing_reviews("Baby_Products")
# clean_existing_reviews("Cell_Phones_and_Accessories")
# clean_existing_reviews("Grocery_and_Gourmet_Food")
# clean_existing_reviews("Health_and_Personal_Care")
# clean_existing_reviews("Musical_Instruments")
# clean_existing_reviews("Office_Products")
# clean_existing_reviews("Patio_Lawn_and_Garden")


# clean_existing_reviews("Pet_Supplies")
# clean_existing_reviews("Sports_and_Outdoors")
# clean_existing_reviews("Toys_and_Games")


[02:00:32] START reviews clean Sports_and_Outdoors


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/miniconda3/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [37]:
import os, glob, shutil, pathlib

# Spark temp dirs
for root in ("/tmp", "/var/tmp"):
    for p in glob.glob(os.path.join(root, "spark-*")):
        try:
            shutil.rmtree(p, ignore_errors=True)
        except: pass

# Our staging
for p in glob.glob("/mnt/tmp/meta_*.jsonl*") + glob.glob("/mnt/tmp/review_*.jsonl*"):
    try: os.remove(p)
    except: pass

# Optional: wipe any previous local outputs used for testing
for p in ["/mnt/data/amazon2023_local", "/mnt/data/spark-tmp"]:
    if os.path.exists(p):
        shutil.rmtree(p, ignore_errors=True)

print("Local cleanup done.")


Local cleanup done.


In [2]:
# ============================================================
# BUILD + COMPACT CLEANED META (no precombined source needed)
# ============================================================
from pyspark.sql import functions as F
import time

# Categories you want
META_CATS = [
    "All_Beauty",
    "Arts_Crafts_and_Sewing",
    "Automotive",
    "Baby_Products",
    "Cell_Phones_and_Accessories",
    "Grocery_and_Gourmet_Food",
    "Health_and_Personal_Care",
    "Musical_Instruments",
    "Office_Products",
    "Patio_Lawn_and_Garden",
]

GCS_BASE            = "gs://qst843-project/amazon_reviews_2023"
META_CLEAN_ROOT     = f"{GCS_BASE}/bronze/clean_data/meta"
DST                 = f"{GCS_BASE}/silver/meta_combined_compact"   # final compacted output

spark.conf.set("spark.hadoop.fs.gs.impl","com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("spark.hadoop.google.cloud.project","qst843-project")
spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.sql.shuffle.partitions","128")

def path_exists(uri: str) -> bool:
    try:
        jvm = spark._jvm
        conf = spark._jsc.hadoopConfiguration()
        fs = jvm.org.apache.hadoop.fs.FileSystem.get(jvm.java.net.URI(uri), conf)
        return fs.exists(jvm.org.apache.hadoop.fs.Path(uri))
    except Exception:
        return False

print(f"[{time.strftime('%H:%M:%S')}] Scanning cleaned meta folders...")
paths = []
for cat in META_CATS:
    p = f"{META_CLEAN_ROOT}/{cat}"
    if path_exists(p):
        print(f"  ✔ {cat}")
        paths.append((cat, p))
    else:
        print(f"  ⚠️  missing: {p}")

if not paths:
    raise FileNotFoundError("No cleaned meta folders found under bronze/clean_data/meta for the given categories.")

# Read & union with category tag (append-by-loop to avoid schema pain)
df_all = None
for i, (cat, p) in enumerate(paths, 1):
    print(f"[{time.strftime('%H:%M:%S')}] ► ({i}/{len(paths)}) Reading {cat}")
    df_cat = (spark.read
                 .option("recursiveFileLookup","true")
                 .parquet(p)
             ).withColumn("category_name", F.lit(cat))
    if df_all is None:
        df_all = df_cat
    else:
        df_all = df_all.unionByName(df_cat, allowMissingColumns=True)

print(f"[{time.strftime('%H:%M:%S')}] Repartitioning and writing compact dataset to: {DST}")
n_files = 16  # adjust to 8/16/32 as needed
(df_all.repartition(n_files)
      .write.mode("overwrite")
      .option("compression","snappy")
      .parquet(DST))

print(f"[{time.strftime('%H:%M:%S')}] ✅ Done.")
print("Output:", DST)


[02:51:45] Scanning cleaned meta folders...
  ✔ All_Beauty
  ✔ Arts_Crafts_and_Sewing
  ✔ Automotive
  ✔ Baby_Products
  ✔ Cell_Phones_and_Accessories
  ✔ Grocery_and_Gourmet_Food
  ✔ Health_and_Personal_Care
  ✔ Musical_Instruments
  ✔ Office_Products
  ✔ Patio_Lawn_and_Garden
[02:51:45] ► (1/10) Reading All_Beauty


                                                                                

[02:51:48] ► (2/10) Reading Arts_Crafts_and_Sewing
[02:51:49] ► (3/10) Reading Automotive
[02:51:49] ► (4/10) Reading Baby_Products
[02:51:50] ► (5/10) Reading Cell_Phones_and_Accessories
[02:51:50] ► (6/10) Reading Grocery_and_Gourmet_Food
[02:51:51] ► (7/10) Reading Health_and_Personal_Care
[02:51:51] ► (8/10) Reading Musical_Instruments
[02:51:51] ► (9/10) Reading Office_Products
[02:51:52] ► (10/10) Reading Patio_Lawn_and_Garden
[02:51:52] Repartitioning and writing compact dataset to: gs://qst843-project/amazon_reviews_2023/silver/meta_combined_compact


                                                                                

[02:52:36] ✅ Done.
Output: gs://qst843-project/amazon_reviews_2023/silver/meta_combined_compact


In [4]:
# ============================================================
# BUILD + COMPACT CLEANED REVIEWS (no precombined source needed)
# ============================================================
from pyspark.sql import functions as F
import time

# Categories to combine (your list)
REV_CATS = [
    "All_Beauty",
    "Arts_Crafts_and_Sewing",
    "Automotive",
    "Baby_Products",
    "Cell_Phones_and_Accessories",
    "Grocery_and_Gourmet_Food",
    "Health_and_Personal_Care",
    "Musical_Instruments",
    "Office_Products",
    "Patio_Lawn_and_Garden",
]

GCS_BASE              = "gs://qst843-project/amazon_reviews_2023"
REVIEWS_CLEAN_ROOT    = f"{GCS_BASE}/bronze/clean_data/reviews"
REV_COMBINED_COMPACT  = f"{GCS_BASE}/silver/reviews_combined_compact"  # final compacted output

# GCS + Spark settings
spark.conf.set("spark.hadoop.fs.gs.impl","com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("spark.hadoop.google.cloud.project","qst843-project")
spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.sql.shuffle.partitions","128")
spark.conf.set("spark.sql.files.ignoreMissingFiles","true")

def path_exists(uri: str) -> bool:
    try:
        jvm = spark._jvm
        conf = spark._jsc.hadoopConfiguration()
        fs = jvm.org.apache.hadoop.fs.FileSystem.get(jvm.java.net.URI(uri), conf)
        return fs.exists(jvm.org.apache.hadoop.fs.Path(uri))
    except Exception:
        return False

print(f"[{time.strftime('%H:%M:%S')}] Scanning cleaned reviews folders...")
existing = []
for cat in REV_CATS:
    p = f"{REVIEWS_CLEAN_ROOT}/{cat}"
    if path_exists(p):
        print(f"  ✔ {cat}")
        existing.append((cat, p))
    else:
        print(f"  ⚠️  missing: {p}")

if not existing:
    raise FileNotFoundError("No cleaned reviews found under bronze/clean_data/reviews for the given categories.")

# Read + union incrementally (robust to schema drift)
df_all = None
for i, (cat, p) in enumerate(existing, 1):
    print(f"[{time.strftime('%H:%M:%S')}] ► ({i}/{len(existing)}) Reading {cat}")
    df_cat = (spark.read
                .option("recursiveFileLookup","true")
                .parquet(p)
              ).withColumn("category_name", F.lit(cat))
    if df_all is None:
        df_all = df_cat
    else:
        df_all = df_all.unionByName(df_cat, allowMissingColumns=True)

# Compact to a small number of large files (fast to read)
n_files = 16   # adjust to 8/16/32 as needed (lower if disk is tight)
print(f"[{time.strftime('%H:%M:%S')}] Writing compacted reviews to: {REV_COMBINED_COMPACT} as ~{n_files} files")
(df_all.repartition(n_files)
      .write.mode("overwrite")
      .option("compression","snappy")
      .parquet(REV_COMBINED_COMPACT))

print(f"[{time.strftime('%H:%M:%S')}] ✅ Done.")
print("Output:", REV_COMBINED_COMPACT)


[03:00:58] Scanning cleaned reviews folders...
  ✔ All_Beauty
  ✔ Arts_Crafts_and_Sewing
  ✔ Automotive
  ✔ Baby_Products
  ✔ Cell_Phones_and_Accessories
  ✔ Grocery_and_Gourmet_Food
  ✔ Health_and_Personal_Care
  ✔ Musical_Instruments
  ✔ Office_Products
  ✔ Patio_Lawn_and_Garden
[03:00:58] ► (1/10) Reading All_Beauty
[03:00:58] ► (2/10) Reading Arts_Crafts_and_Sewing
[03:00:59] ► (3/10) Reading Automotive
[03:00:59] ► (4/10) Reading Baby_Products
[03:00:59] ► (5/10) Reading Cell_Phones_and_Accessories
[03:01:00] ► (6/10) Reading Grocery_and_Gourmet_Food
[03:01:00] ► (7/10) Reading Health_and_Personal_Care
[03:01:00] ► (8/10) Reading Musical_Instruments
[03:01:01] ► (9/10) Reading Office_Products
[03:01:01] ► (10/10) Reading Patio_Lawn_and_Garden
[03:01:01] Writing compacted reviews to: gs://qst843-project/amazon_reviews_2023/silver/reviews_combined_compact as ~16 files


                                                                                

[03:02:44] ✅ Done.
Output: gs://qst843-project/amazon_reviews_2023/silver/reviews_combined_compact


In [5]:
# ============================================================
# SANITY CHECK: Combined Reviews Compact Dataset
# ============================================================
from pyspark.sql import functions as F
import time

REV_COMBINED_COMPACT = "gs://qst843-project/amazon_reviews_2023/silver/reviews_combined_compact"

print(f"[{time.strftime('%H:%M:%S')}] Checking combined reviews parquet at: {REV_COMBINED_COMPACT}")

# Try reading a few files (recursive handles nested folders)
df_reviews = (spark.read
                  .option("recursiveFileLookup","true")
                  .parquet(REV_COMBINED_COMPACT))

# --- Basic checks ---
print(f"\n[✓] Schema:")
df_reviews.printSchema()

print(f"\n[✓] Total rows (approx): {df_reviews.count():,}")

print("\n[✓] Sample records:")
df_reviews.select(
    "user_id",
    "parent_asin",
    "timestamp",
    "rating",
    "title",
    "category_name"
).show(10, truncate=True)

# --- Additional quality spot checks ---
print("\n[✓] Rating distribution:")
df_reviews.groupBy("rating").count().orderBy("rating").show()

print("\n[✓] Review counts per category:")
df_reviews.groupBy("category_name").count().orderBy(F.desc("count")).show()

print("\n[✓] Null count summary (subset of key columns):")
df_reviews.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in ["user_id","parent_asin","rating","text","timestamp"]
]).show()


[03:03:44] Checking combined reviews parquet at: gs://qst843-project/amazon_reviews_2023/silver/reviews_combined_compact

[✓] Schema:
root
 |-- user_id: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- helpful_vote: integer (nullable = true)
 |-- verified_purchase: boolean (nullable = true)
 |-- review_image: boolean (nullable = true)
 |-- category_name: string (nullable = true)



                                                                                


[✓] Total rows (approx): 102,531,726

[✓] Sample records:


                                                                                

+--------------------+-----------+-------------------+------+--------------------+--------------------+
|             user_id|parent_asin|          timestamp|rating|               title|       category_name|
+--------------------+-----------+-------------------+------+--------------------+--------------------+
|AGZPNP4EC4Z7CTHY2...| B07V34XSJ8|2021-05-30 08:51:53|     5|        Comfortable!|Arts_Crafts_and_S...|
|AFOCCQXZYCTLGLQ4Y...| B0047BITNI|2015-03-10 18:23:14|     5|          Five Stars|Arts_Crafts_and_S...|
|AE5XOXRPK5ZCDD2DC...| B08Z7CRNSC|2022-01-17 16:45:32|     5|        Very pleased|Arts_Crafts_and_S...|
|AE4JS4KHF5SU7PICZ...| B007C7XPME|2020-11-23 13:29:21|     5|        Fun and Easy|Arts_Crafts_and_S...|
|AHZW6N77UGOLTYM6A...| B00FFFR7E2|2020-01-28 07:32:49|     5|        They are big|Arts_Crafts_and_S...|
|AEXD6MEZ562LW7JGA...| B005R4FEKA|2018-05-26 02:03:41|     5|          Five Stars|Arts_Crafts_and_S...|
|AEK5UPTJQEIOPKJNI...| B071S4747T|2019-08-08 04:24:17|     5|Gre

                                                                                

+------+--------+
|rating|   count|
+------+--------+
|     1|12321453|
|     2| 5131627|
|     3| 6774232|
|     4|11112738|
|     5|67191676|
+------+--------+


[✓] Review counts per category:


                                                                                

+--------------------+--------+
|       category_name|   count|
+--------------------+--------+
|Cell_Phones_and_A...|20576383|
|          Automotive|19723213|
|Patio_Lawn_and_Ga...|16318138|
|Grocery_and_Gourm...|14187554|
|     Office_Products|12715091|
|Arts_Crafts_and_S...| 8876371|
|       Baby_Products| 5967954|
| Musical_Instruments| 2983780|
|          All_Beauty|  694252|
|Health_and_Person...|  488990|
+--------------------+--------+


[✓] Null count summary (subset of key columns):




+-------+-----------+------+----+---------+
|user_id|parent_asin|rating|text|timestamp|
+-------+-----------+------+----+---------+
|      0|          0|     0|   0|        0|
+-------+-----------+------+----+---------+



                                                                                

In [6]:
# ============================================================
# SANITY CHECK: Combined Meta Compact Dataset
# ============================================================
from pyspark.sql import functions as F
import time

META_COMBINED_COMPACT = "gs://qst843-project/amazon_reviews_2023/silver/meta_combined_compact"

print(f"[{time.strftime('%H:%M:%S')}] Checking combined meta parquet at: {META_COMBINED_COMPACT}")

# Read (recursive in case of nested folder layout)
df_meta = (spark.read
               .option("recursiveFileLookup","true")
               .parquet(META_COMBINED_COMPACT))

# --- Basic checks ---
print("\n[✓] Schema:")
df_meta.printSchema()

print("\n[✓] Total rows:", f"{df_meta.count():,}")

print("\n[✓] Sample rows:")
df_meta.select(
    "parent_asin",
    "title",
    "main_category",
    "price",
    "average_rating",
    "rating_number",
    "brand",
    "store",
    "category_name"
).show(10, truncate=True)

# --- Category coverage ---
print("\n[✓] Rows per category_name:")
df_meta.groupBy("category_name").count().orderBy(F.desc("count")).show(truncate=False)

# --- Key nulls summary ---
print("\n[✓] Null counts (key fields):")
df_meta.select(
    F.count(F.when(F.col("parent_asin").isNull(), 1)).alias("null_parent_asin"),
    F.count(F.when(F.col("title").isNull() | (F.length(F.col("title")) == 0), 1)).alias("null_or_empty_title"),
    F.count(F.when(F.col("main_category").isNull(), 1)).alias("null_main_category"),
    F.count(F.when(F.col("price").isNull(), 1)).alias("null_price"),
    F.count(F.when(F.col("average_rating").isNull(), 1)).alias("null_avg_rating"),
    F.count(F.when(F.col("rating_number").isNull(), 1)).alias("null_rating_number"),
).show()

# --- Price sanity ---
print("\n[✓] Price summary (exclude 0/negatives for sanity):")
df_meta.filter(F.col("price").isNotNull() & (F.col("price") > 0)) \
       .select(
           F.expr("percentile(price, array(0.0,0.25,0.5,0.75,0.9,0.99))").alias("percentiles"),
           F.min("price").alias("min"),
           F.max("price").alias("max"),
           F.avg("price").alias("avg")
       ).show(truncate=False)

# --- Rating sanity ---
print("\n[✓] Average rating stats:")
df_meta.filter(F.col("average_rating").isNotNull()) \
       .select(
           F.expr("percentile(average_rating, array(0.0,0.25,0.5,0.75,0.9,0.99))").alias("percentiles"),
           F.min("average_rating").alias("min"),
           F.max("average_rating").alias("max"),
           F.avg("average_rating").alias("avg")
       ).show(truncate=False)

print("\n[✓] Top brands by product count (top 20):")
df_meta.groupBy("brand").count().orderBy(F.desc("count")).show(20, truncate=False)

print("\n[✓] Missing main_category by category_name (top 10):")
df_meta.filter(F.col("main_category").isNull()) \
       .groupBy("category_name").count().orderBy(F.desc("count")).show(10, truncate=False)


[03:05:56] Checking combined meta parquet at: gs://qst843-project/amazon_reviews_2023/silver/meta_combined_compact

[✓] Schema:
root
 |-- parent_asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- main_category: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- price: double (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- average_rating: double (nullable = true)
 |-- rating_number: long (nullable = true)
 |-- brand: string (nullable = true)
 |-- store: string (nullable = true)
 |-- product_image: boolean (nullable = true)
 |-- product_video: boolean (nullable = true)
 |-- category_name: string (nullable = true)



                                                                                


[✓] Total rows: 5,244,716

[✓] Sample rows:
+-----------+--------------------+-------------+-----+--------------+-------------+-----+-------------+-------------+
|parent_asin|               title|main_category|price|average_rating|rating_number|brand|        store|category_name|
+-----------+--------------------+-------------+-----+--------------+-------------+-----+-------------+-------------+
| B01AB5SIXO|NuGene NuEye Eye ...|   All Beauty| NULL|           5.0|            1| NULL|         NULL|   All_Beauty|
| B07DNP5SY9|18INCH #24 Ash Bl...|   All Beauty| NULL|           1.0|            1| NULL|     benehair|   All_Beauty|
| B08F51HG1R|Headbands for Wom...|   All Beauty| NULL|           4.3|           23| NULL|   makersland|   All_Beauty|
| B00IIAJYEC|"THE NASTY" Mascu...|   All Beauty| NULL|           3.2|           45| NULL| spellboundrx|   All_Beauty|
| B07Q8XGVLG|Makeup Blur Remov...|   All Beauty| NULL|           4.4|           24| NULL|  makeup blur|   All_Beauty|
| B07VMGV3S

                                                                                

+---------------------------+-------+
|category_name              |count  |
+---------------------------+-------+
|Cell_Phones_and_Accessories|1288490|
|Patio_Lawn_and_Garden      |851907 |
|Arts_Crafts_and_Sewing     |801446 |
|Office_Products            |710503 |
|Grocery_and_Gourmet_Food   |603274 |
|Automotive                 |384896 |
|Baby_Products              |217724 |
|Musical_Instruments        |213593 |
|All_Beauty                 |112590 |
|Health_and_Personal_Care   |60293  |
+---------------------------+-------+


[✓] Null counts (key fields):


                                                                                

+----------------+-------------------+------------------+----------+---------------+------------------+
|null_parent_asin|null_or_empty_title|null_main_category|null_price|null_avg_rating|null_rating_number|
+----------------+-------------------+------------------+----------+---------------+------------------+
|               0|                372|            317107|   3318499|              0|                 0|
+----------------+-------------------+------------------+----------+---------------+------------------+


[✓] Price summary (exclude 0/negatives for sanity):


                                                                                

+-----------------------------------------------------+----+---------+-----------------+
|percentiles                                          |min |max      |avg              |
+-----------------------------------------------------+----+---------+-----------------+
|[0.01, 10.15, 16.99, 35.04, 89.99, 500.9937999999989]|0.01|1099995.0|51.25955624217865|
+-----------------------------------------------------+----+---------+-----------------+


[✓] Average rating stats:


                                                                                

+------------------------------+---+---+-----------------+
|percentiles                   |min|max|avg              |
+------------------------------+---+---+-----------------+
|[1.0, 3.8, 4.3, 4.7, 5.0, 5.0]|1.0|5.0|4.126237702860981|
+------------------------------+---+---+-----------------+


[✓] Top brands by product count (top 20):


                                                                                

+-----+-------+
|brand|count  |
+-----+-------+
|NULL |5244716|
+-----+-------+


[✓] Missing main_category by category_name (top 10):




+---------------------------+------+
|category_name              |count |
+---------------------------+------+
|Cell_Phones_and_Accessories|112432|
|Arts_Crafts_and_Sewing     |80265 |
|Patio_Lawn_and_Garden      |69823 |
|Office_Products            |23944 |
|Baby_Products              |17880 |
|Grocery_and_Gourmet_Food   |7960  |
|Musical_Instruments        |3392  |
|Automotive                 |1411  |
+---------------------------+------+



                                                                                