## Data Engineering Case Study – Merkle
## This notebook implements a **Data Lake pipeline** with three layers:
- **Bronze (Layer 1):** Raw data from S3
- **Silver (Layer 2):** Cleaned and transformed tables
- **Gold (Layer 3):** Datamart with aggregated metrics (`top_item`)

In [0]:



from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, year, to_timestamp, count, dense_rank, row_number
)
from pyspark.sql.window import Window
import pandas as pd

spark = SparkSession.builder.getOrCreate()

## Layer 1 – Bronze (Raw Data)
- Load CSV files from S3:
  - `item.csv`
  - `event.csv`
- All columns are stored as STRING
- No transformations applied
- Purpose: preserve **original raw data** for reproducibility

In [0]:
item_pd = pd.read_csv(
    "https://merkle-de-interview-case-study.s3.eu-central-1.amazonaws.com/de/item.csv",
    dtype=str
)

event_pd = pd.read_csv(
    "https://merkle-de-interview-case-study.s3.eu-central-1.amazonaws.com/de/event.csv",
    dtype=str
)

##Convert Pandas → Spark DataFrames

In [0]:
item_bronze_df = spark.createDataFrame(item_pd)
event_bronze_df = spark.createDataFrame(event_pd)

##Persist Bronze Tables

In [0]:
item_bronze_df.write.mode("overwrite").saveAsTable("bronze_item")
event_bronze_df.write.mode("overwrite").saveAsTable("bronze_event")

##Silver Item Dimension

In [0]:
item_silver_df = (
    spark.table("bronze_item")
    .withColumn("id", col("id").try_cast("int"))
    .withColumn("name", col("name").cast("string"))
)

item_silver_df.write.mode("overwrite").saveAsTable("silver_item")

In [0]:
event_bronze_df.show()

##Silver Event Fact Table

In [0]:
from pyspark.sql.functions import from_json, to_timestamp, year, col
from pyspark.sql.types import StructType, StructField, StringType

# Define JSON schema
json_schema = StructType([
    StructField("event_name", StringType(), True),
    StructField("platform", StringType(), True),
    StructField("parameter_name", StringType(), True),
    StructField("parameter_value", StringType(), True)
])

# Parse JSON and flatten
event_silver_df = (
    spark.table("bronze_event")
    .withColumn("event_time", to_timestamp(col("event_time")))
    .withColumn("event_year", year(col("event_time")))
    .withColumn("json_data", from_json(col("`event.payload`"), json_schema))  # backticks around column name
    .select(
        col("event_time"),
        col("event_year"),
        col("json_data.event_name"),
        col("json_data.platform"),
        col("json_data.parameter_name"),
        col("json_data.parameter_value")
    )
)

event_silver_df.show(5, truncate=False)

In [0]:
item_events_df = event_silver_df.filter(col("parameter_name") == "item_id")
item_events_df.show()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Count total views per item per year
agg_df = item_events_df.groupBy("parameter_value", "event_year") \
    .agg(
        F.count("*").alias("total_views"),
        F.expr("mode() within group (order by platform)").alias("most_used_platform")  # Most frequent platform
    ) \
    .withColumnRenamed("parameter_value", "item_id")

agg_df.show()

In [0]:
window_spec = Window.partitionBy("event_year").orderBy(F.desc("total_views"))
top_item_df = agg_df.withColumn("rank", F.rank().over(window_spec))
top_item_df.show(10, truncate=False)


In [0]:
top_item_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("top_item")

In [0]:
%sql
SELECT * FROM top_item WHERE event_year = 2016 ORDER BY rank