# READ THE BRONZE DATA CREATE DATA FRAME 

In [1]:
customers_raw = spark.read.parquet("abfss://ecommerce@onelake.dfs.fabric.microsoft.com/ecoomerce_lakehouse.Lakehouse/Files/Bronze/customers.parquet")
orders_raw = spark.read.parquet("abfss://ecommerce@onelake.dfs.fabric.microsoft.com/ecoomerce_lakehouse.Lakehouse/Files/Bronze/orders.parquet")
payments_raw = spark.read.parquet("abfss://ecommerce@onelake.dfs.fabric.microsoft.com/ecoomerce_lakehouse.Lakehouse/Files/Bronze/payments.parquet")
support_raw = spark.read.parquet("abfss://ecommerce@onelake.dfs.fabric.microsoft.com/ecoomerce_lakehouse.Lakehouse/Files/Bronze/support_tickets.parquet")
web_raw = spark.read.parquet("abfss://ecommerce@onelake.dfs.fabric.microsoft.com/ecoomerce_lakehouse.Lakehouse/Files/Bronze/web_activities.parquet")

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 3, Finished, Available, Finished)

# read bronze data

In [2]:
display(customers_raw.limit(5))

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6cdd6ec3-c0c3-4b35-b622-142336dcfe19)

In [3]:
display(orders_raw.limit(5))

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1cc2dcdf-4a34-497b-830a-f520bd41aff8)

In [4]:
display(payments_raw.limit(5))

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 21ef2908-5fd7-46b3-b6f7-d5c1d33b2fbd)

In [5]:
display(support_raw.limit(5))

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2685d7fa-ef22-444e-9efc-7fe9a73e835f)

In [6]:
display(web_raw.limit(5))

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8cf4ae29-f18a-446a-a9e0-04237a10e44b)

# Create delta bronze table

In [8]:
# Save as Bronze Delta Tables
customers_raw.write.format("delta").mode("overwrite").saveAsTable("customers")
orders_raw.write.format("delta").mode("overwrite").saveAsTable("orders")
payments_raw.write.format("delta").mode("overwrite").saveAsTable("payments")
support_raw.write.format("delta").mode("overwrite").saveAsTable("support")
web_raw.write.format("delta").mode("overwrite").saveAsTable("web")

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 10, Finished, Available, Finished)

# cleaned the data - silver 

## customer data -clean

In [10]:
display(customers_raw.limit(5))

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9bf2ab15-c929-4d6a-98b5-1871c802c99b)

In [12]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


customers_clean = (
    customers_raw
    .withColumn("email", lower(trim(col("EMAIL"))))
    .withColumn("name", initcap(trim(col("name"))))
    .withColumn("gender", when(lower(col("gender")).isin("f", "female"), "Female")
                          .when(lower(col("gender")).isin("m", "male"), "Male")
                          .otherwise("Other"))
    .withColumn("dob", to_date(regexp_replace(col("dob"), "/", "-")))
    .withColumn("location", initcap(col("location")))
    .dropDuplicates(["customer_id"])
    .dropna(subset=["customer_id", "email"])
)
display(customers_clean.limit(6))

customers_clean.write.format("delta").mode("overwrite").saveAsTable("silver_customers")



StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ceea8ec9-bdad-48ea-9b45-f735cc5fa3ef)

## clean order table

In [14]:
%%sql 
select * from orders limit 5

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 16, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 5 fields>

In [16]:
orders = spark.table("orders")
orders_clean = (
    orders
    .withColumn("order_date", 
                when(col("order_date").rlike("^\d{4}/\d{2}/\d{2}$"), to_date(col("order_date"), "yyyy/MM/dd"))
                .when(col("order_date").rlike("^\d{2}-\d{2}-\d{4}$"), to_date(col("order_date"), "dd-MM-yyyy"))
                .when(col("order_date").rlike("^\d{8}$"), to_date(col("order_date"), "yyyyMMdd"))
                .otherwise(to_date(col("order_date"), "yyyy-MM-dd")))
    .withColumn("amount", col("amount").cast(DoubleType()))
    .withColumn("amount", when(col("amount") < 0, None).otherwise(col("amount")))
    .withColumn("status", initcap(col("status")))
    .dropna(subset=["customer_id", "order_date"])
    .dropDuplicates(["order_id"])
)
display(orders_clean.limit(5))
orders_clean.write.format("delta").mode("overwrite").saveAsTable("silver_orders")

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 150d39d5-9a4c-4e30-80d7-11ff74369ddc)

# Payments table - clean

In [18]:
%%sql 
select * from payments limit 5

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 20, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 6 fields>

In [20]:
payments = spark.table("payments")
payments_clean = (
    payments
    .withColumn("payment_date", to_date(regexp_replace(col("payment_date"), "/", "-")))
    .withColumn("payment_method", initcap(col("payment_method")))
    .replace({"creditcard": "Credit Card"}, subset=["payment_method"])
    .withColumn("payment_status", initcap(col("payment_status")))
    .withColumn("amount", col("amount").cast(DoubleType()))
    .withColumn("amount", when(col("amount") < 0, None).otherwise(col("amount")))
    .dropna(subset=["customer_id", "payment_date", "amount"])
)
display(payments_clean.limit(5))
payments_clean.write.format("delta").mode("overwrite").saveAsTable("silver_payments")


StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 22, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ace06880-da17-460c-8dc6-01e8df3fc64f)

# clean - support tables

In [22]:
%%sql
select * from support limit 5

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 24, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 5 fields>

In [24]:
support = spark.table("support")
support_clean = (
    support
    .withColumn("ticket_date", to_date(regexp_replace(col("ticket_date"), "/", "-")))
    .withColumn("issue_type", initcap(trim(col("issue_type"))))
    .withColumn("resolution_status", initcap(trim(col("resolution_status"))))
    .replace({"NA": None, "": None}, subset=["issue_type", "resolution_status"])
    .dropDuplicates(["ticket_id"])
    .dropna(subset=["customer_id", "ticket_date"])
)
display(support_clean.limit(5))

support_clean.write.format("delta").mode("overwrite").saveAsTable("silver_support")

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 26, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e7461ee1-77f1-4768-8e52-862886c0c440)

# - clean - web data 

In [26]:
%%sql 
select * from web limit 5

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 28, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 5 fields>

In [29]:
web = spark.table("web")
web_clean = (
    web
    .withColumn("session_time", to_date(regexp_replace(col("session_time"), "/", "-")))
    .withColumn("page_viewed", lower(col("page_viewed")))
    .withColumn("device_type", initcap(col("device_type")))
    .dropDuplicates(["session_id"])
    .dropna(subset=["customer_id", "session_time", "page_viewed"])
)
display(web_clean.limit(5))
web_clean.write.format("delta").mode("overwrite").saveAsTable("silver_web")


StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 31, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 258e76f9-46c9-4352-a375-df92b1ab7c63)

# GOLD TABLES - AGGRGATE TABLES

In [35]:
cust = spark.table("silver_customers").alias("c")
orders = spark.table("silver_orders").alias("o")
payments = spark.table("silver_payments").alias("p")
support = spark.table("silver_support").alias("s")
web = spark.table("silver_web").alias("w")

customer360 = (
    cust
    .join(orders, "customer_id", "left")
    .join(payments, "customer_id", "left")
    .join(support, "customer_id", "left")
    .join(web, "customer_id", "left")
    .select(
        col("c.customer_id"),
        col("c.name"),
        col("c.email"),
        col("c.gender"),
        col("c.dob"),
        col("c.location"),

        col("o.order_id"),
        col("o.order_date"),
        col("o.amount").alias("order_amount"),
        col("o.status").alias("order_status"),

        col("p.payment_method"),
        col("p.payment_status"),
        col("p.amount").alias("payment_amount"),

        col("s.ticket_id"),
        col("s.issue_type"),
        col("s.ticket_date"),
        col("s.resolution_status"),

        col("w.page_viewed"),
        col("w.device_type"),
        col("w.session_time")
    )
)
display(customer360.limit(10))

customer360.write.format("delta").mode("overwrite").saveAsTable("gold_customer360")

StatementMeta(, 14010932-f8db-449d-a723-373eaab66fa4, 37, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9743d765-9a94-498f-891a-5f8283273c90)