## **DAY 6-Design 3-Layer Architecture**

In [0]:
#Build BRONZE
#Just load CSV and store as-is in Delta.(raw)
bronze_df = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv",
    header=True,
    inferSchema=True
)

bronze_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/Volumes/workspace/ecommerce/ecommerce_data/delta/bronze/events")


In [0]:
bronze_df.show(5)

+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code| brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|2019-11-01 00:00:00|      view|   1003461|2053013555631882655|electronics.smart...|xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:00|      view|   5000088|2053013566100866035|appliances.sewing...|janome|293.65|530496790|8e5f4f83-366c-4f7...|
|2019-11-01 00:00:01|      view|  17302664|2053013553853497655|                NULL| creed| 28.31|561587266|755422e7-9040-477...|
|2019-11-01 00:00:01|      view|   3601530|2053013563810775923|appliances.kitche...|    lg|712.87|518085591|3bfb58cd-7892-48c...|
|2019-11-01 00:00:01|      view|   1004775|2053013555631882655|electronics.smart...|xiaomi

In [0]:
#Build SILVER (Cleaning & Validation)
#remove duplicates and drop nulls in important columns
silver_df = spark.read.format("delta").load(
    "/Volumes/workspace/ecommerce/ecommerce_data/delta/bronze/events"
)

silver_clean = silver_df \
    .dropDuplicates(["event_time", "user_id"]) \
    .dropna(subset=["event_time", "user_id", "event_type"])



In [0]:
#Calculating dropped rows
before_count = silver_df.count()
after_count = silver_clean.count()

dropped_rows = before_count - after_count
print("Total rows dropped:", dropped_rows)


Total rows dropped: 174897


In [0]:
silver_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/Volumes/workspace/ecommerce/ecommerce_data/delta/silver/events")


In [0]:
silver_clean.show(5)

+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|2019-11-17 08:43:03|      view|   2601163|2053013563970159485|                NULL|    king| 119.4|512969200|3baf5107-7ac9-4e0...|
|2019-11-17 08:43:04|      view|   6500705|2053013554155487563|computers.compone...|gigabyte|116.35|522549017|ddef69c8-20b5-0b6...|
|2019-11-17 08:43:16|      view|   3701333|2053013565983425517|appliances.enviro...| philips|200.52|516581294|00bad730-c732-4f6...|
|2019-11-17 08:43:23|      view|  13900092|2053013557343158789|construction.comp...|decoroom| 38.75|522899105|3cca613e-db33-4be...|
|2019-11-17 08:43:33|      view|   5100855|2053013553341792533|  electronics

In [0]:
#Build GOLD (Business Aggregates)
#How many events per event_type?
from pyspark.sql.functions import count

gold_df = spark.read.format("delta").load(
    "/Volumes/workspace/ecommerce/ecommerce_data/delta/silver/events"
)

gold_agg = gold_df.groupBy("event_type").agg(
    count("*").alias("total_events")
)


In [0]:
gold_agg.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/Volumes/workspace/ecommerce/ecommerce_data/delta/gold/event_summary")


In [0]:
gold_agg.show(5)

+----------+------------+
|event_type|total_events|
+----------+------------+
|  purchase|      916923|
|      cart|     2910063|
|      view|    63500096|
+----------+------------+

