**READ THE BRONZE DATA FOR CREATING DATAFRAME**

In [1]:
customers_raw_data = spark.read.parquet("abfss://ecommerse_workspace_praveen_aluru@onelake.dfs.fabric.microsoft.com/ecommerse_lakehouse_praveen_aluru.Lakehouse/Files/bronze/customers.parquet")
orders_raw_data = spark.read.parquet("abfss://ecommerse_workspace_praveen_aluru@onelake.dfs.fabric.microsoft.com/ecommerse_lakehouse_praveen_aluru.Lakehouse/Files/bronze/orders.parquet")
payments_raw_data = spark.read.parquet("abfss://ecommerse_workspace_praveen_aluru@onelake.dfs.fabric.microsoft.com/ecommerse_lakehouse_praveen_aluru.Lakehouse/Files/bronze/payments.parquet")
support_raw_data = spark.read.parquet("abfss://ecommerse_workspace_praveen_aluru@onelake.dfs.fabric.microsoft.com/ecommerse_lakehouse_praveen_aluru.Lakehouse/Files/bronze/support_tickets.parquet")
web_raw_data = spark.read.parquet("abfss://ecommerse_workspace_praveen_aluru@onelake.dfs.fabric.microsoft.com/ecommerse_lakehouse_praveen_aluru.Lakehouse/Files/bronze/web_activities.parquet")

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 3, Finished, Available, Finished)

**DISPLAYING BRONZE DATA FOR CUSTOMERS_RAW_DATA**

In [2]:
display(customers_raw_data.limit(5))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 630a6946-61a4-45bd-8e7f-0a5be9e1ce3c)

**DISPLAYING BRONZE DATA FOR ORDERS_RAW_DATA**

In [3]:
display(orders_raw_data.limit(5))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5f772521-e452-47b6-8226-1e62b2abf2ec)

**DISPLAYING BRONZE DATA FOR PAYMENTS_RAW_DATA**

In [6]:
display(payments_raw_data.limit(5))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a7d1552b-e31a-4f8f-b7c7-98a14fa4a347)

****

**DISPLAYING BRONZE DATA FOR SUPPORT_RAW_DATA**

In [8]:
display(support_raw_data.limit(5))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 77d58d27-2910-4188-93e0-e50fc23e9643)

****

**DISPLAYING BRONZE DATA FOR WEB_ACTIVITY_RAW_DATA**

In [9]:
display(web_raw_data.limit(5))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4971f347-2cda-4b52-8d6f-a23ad26c0e90)

**CREATE DELTA BRONZE TABLES**

In [10]:
customers_raw_data.write.format("delta").mode("overwrite").saveAsTable("customers")
orders_raw_data.write.format("delta").mode("overwrite").saveAsTable("orders")
payments_raw_data.write.format("delta").mode("overwrite").saveAsTable("payments")
support_raw_data.write.format("delta").mode("overwrite").saveAsTable("support")
web_raw_data.write.format("delta").mode("overwrite").saveAsTable("webactivity")

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 12, Finished, Available, Finished)

**CLEANED THE DATA - CUSTOMERS_RAW_DATA**

In [12]:
display(customers_raw_data.limit(5))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 14, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 99e212ed-d7fc-4ea3-91f3-be12af08278e)

In [13]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


customers_cleaned = (
    customers_raw_data
    .withColumn("email", lower(trim(col("EMAIL"))))
    .withColumn("name", initcap(trim(col("name"))))
    .withColumn("gender", when(lower(col("gender")).isin("f", "female"), "Female")
                          .when(lower(col("gender")).isin("m", "male"), "Male")
                          .otherwise("Other"))
    .withColumn("dob", to_date(regexp_replace(col("dob"), "/", "-")))
    .withColumn("location", initcap(col("location")))
    .dropDuplicates(["customer_id"])
    .dropna(subset=["customer_id", "email"])
)
display(customers_cleaned.limit(5))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3c817899-3928-40c3-ac83-67cd6e46126c)

**WRITE INTO MANAGED TABLES**

In [14]:
customers_cleaned.write.format("delta").mode("overwrite").saveAsTable("silver_customers")

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 16, Finished, Available, Finished)

**CLEANED THE DATA - ORDERS_RAW_DATA**

In [16]:
display(orders_raw_data.limit(5))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8581d72c-1d01-4ec6-b0e6-99e20caea25f)

In [18]:
orders_cleaned = (
    orders_raw_data
    .withColumn("order_date", 
                when(col("order_date").rlike("^\d{4}/\d{2}/\d{2}$"), to_date(col("order_date"), "yyyy/MM/dd"))
                .when(col("order_date").rlike("^\d{2}-\d{2}-\d{4}$"), to_date(col("order_date"), "dd-MM-yyyy"))
                .when(col("order_date").rlike("^\d{8}$"), to_date(col("order_date"), "yyyyMMdd"))
                .otherwise(to_date(col("order_date"), "yyyy-MM-dd")))
    .withColumn("amount", col("amount").cast(DoubleType()))
    .withColumn("amount", when(col("amount") < 0, None).otherwise(col("amount")))
    .withColumn("status", initcap(col("status")))
    .dropna(subset=["customer_id", "order_date"])
    .dropDuplicates(["order_id"])
)
display(orders_cleaned.limit(5))


StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 20, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a5c09f88-656a-467f-b2a2-205ccfbc92e3)

**WRITE INTO MANAGED TABLE**

In [19]:
orders_cleaned.write.format("delta").mode("overwrite").saveAsTable("silver_orders")

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 21, Finished, Available, Finished)

**CLEANED THE DATA - PAYMENTS_RAW_DATA**

In [20]:
display(payments_raw_data.limit(5))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 22, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c963bf76-8b92-4ec6-94da-dda029d44ca3)

In [21]:
payments_cleaned = (
    payments_raw_data
    .withColumn("payment_date", to_date(regexp_replace(col("payment_date"), "/", "-")))
    .withColumn("payment_method", initcap(col("payment_method")))
    .replace({"creditcard": "Credit Card"}, subset=["payment_method"])
    .withColumn("payment_status", initcap(col("payment_status")))
    .withColumn("amount", col("amount").cast(DoubleType()))
    .withColumn("amount", when(col("amount") < 0, None).otherwise(col("amount")))
    .dropna(subset=["customer_id", "payment_date", "amount"])
)
display(payments_cleaned.limit(5))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 23, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e9fbd255-77a8-4b25-91a9-a14c349c91f9)

**WRITE INTO MANAGED TABLES**

In [22]:
payments_cleaned.write.format("delta").mode("overwrite").saveAsTable("silver_payments")

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 24, Finished, Available, Finished)

****

**CLEANED THE DATA - SUPPORT_RAW_DATA**

In [24]:
display(support_raw_data.limit(5))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 26, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 08098dd0-6c72-432a-87d6-5b86febd8f5e)

In [25]:
support_cleaned = (
    support_raw_data
    .withColumn("ticket_date", to_date(regexp_replace(col("ticket_date"), "/", "-")))
    .withColumn("issue_type", initcap(trim(col("issue_type"))))
    .withColumn("resolution_status", initcap(trim(col("resolution_status"))))
    .replace({"NA": None, "": None}, subset=["issue_type", "resolution_status"])
    .dropDuplicates(["ticket_id"])
    .dropna(subset=["customer_id", "ticket_date"])
)
display(support_cleaned.limit(5))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 27, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0b71ee04-d98e-4f8c-aa59-b6e1bb6f9960)

**WRITE INTO MANAGED TABLES**

In [26]:
support_cleaned.write.format("delta").mode("overwrite").saveAsTable("silver_support")

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 28, Finished, Available, Finished)

**CLEANED THE DATA - SUPPORT_RAW_DATA**

In [27]:
display(web_raw_data.limit(5))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 29, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e15b4b15-7166-44b6-920d-ff4832da5e5f)

In [28]:
web_cleaned = (
    web_raw_data
    .withColumn("session_time", to_date(regexp_replace(col("session_time"), "/", "-")))
    .withColumn("page_viewed", lower(col("page_viewed")))
    .withColumn("device_type", initcap(col("device_type")))
    .dropDuplicates(["session_id"])
    .dropna(subset=["customer_id", "session_time", "page_viewed"])
)
display(web_cleaned.limit(5))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 30, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2c0af60d-4471-458c-84e6-fc26db16e3cc)

**WRITE INTO MANAGED TABLES**

In [29]:
web_cleaned.write.format("delta").mode("overwrite").saveAsTable("silver_web")

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 31, Finished, Available, Finished)

**AGGREGATION FOR GOLD TABLES**

In [30]:
customers = spark.table("silver_customers").alias("c")
orders = spark.table("silver_orders").alias("o")
payments = spark.table("silver_payments").alias("p")
support = spark.table("silver_support").alias("s")
web = spark.table("silver_web").alias("w")

ecommerse_cleaned_data = (
    customers
    .join(orders, "customer_id", "left")
    .join(payments, "customer_id", "left")
    .join(support, "customer_id", "left")
    .join(web, "customer_id", "left")
    .select(
        col("c.customer_id"),
        col("c.name"),
        col("c.email"),
        col("c.gender"),
        col("c.dob"),
        col("c.location"),

        col("o.order_id"),
        col("o.order_date"),
        col("o.amount").alias("order_amount"),
        col("o.status").alias("order_status"),

        col("p.payment_method"),
        col("p.payment_status"),
        col("p.amount").alias("payment_amount"),

        col("s.ticket_id"),
        col("s.issue_type"),
        col("s.ticket_date"),
        col("s.resolution_status"),

        col("w.page_viewed"),
        col("w.device_type"),
        col("w.session_time")
    )
)
display(ecommerse_cleaned_data.limit(10))

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 32, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0538ee3c-477a-4e2d-be8d-b529441eabf6)

**WRITE INTO MANAGED TABLE**

In [32]:
ecommerse_cleaned_data.write.format("delta").mode("overwrite").saveAsTable("business_gold_table")

StatementMeta(, 8aee85ff-2afd-4a38-a28a-9ecf082b97d3, 34, Finished, Available, Finished)

In [1]:
%%sql
select * from business_gold_table limit 5

StatementMeta(, 7cba7cfa-b262-4fc6-8ee7-40bac37aaf50, 2, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 20 fields>