In [0]:
from pyspark.sql import functions as F
def bronze():
    raw = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)
    bronze_df = raw.withColumn("ingestion_ts", F.current_timestamp())
    return bronze_df

In [0]:
def silver():
    bronze_df = spark.read.table("bronze")
    silver_df = bronze_df.fillna("Unknown", subset=["brand", "category_code"])
    return silver_df


In [0]:
def gold():
    silver_df = spark.read.table("silver")
    gold_df = silver_df.groupBy("brand", "product_id").agg(F.count("product_id").alias("views"), F.sum(F.when(F.col("event_type") == "purchase", F.col("price")).otherwise(0)).alias("revenue"))
    return gold_df

In [0]:
# Add widgets for parameters
dbutils.widgets.text("source_path", "/default/path")
dbutils.widgets.dropdown("layer", "bronze", ["bronze", "silver", "gold"])

# Use parameters
source = dbutils.widgets.get("source_path")
layer = dbutils.widgets.get("layer")

def run_layer(layer_name):
    if layer_name == "bronze":
        df_b = bronze()
        df_b.write.mode("overwrite").saveAsTable("bronze")
    elif layer_name == "silver":
        df_s = silver()
        df_s.write.mode("overwrite").saveAsTable("silver")  
    elif layer_name == "gold":
        df_g = gold()
        df_g.write.mode("overwrite").saveAsTable("gold")
    else:
        raise Exception(f"Invalid layer: {layer_name}")
run_layer(layer)

In [0]:
%sql
select * from gold