In [0]:
# File paths for CSV files
customers_file_path = "dbfs:/FileStore/shared_uploads/riakundra2003@gmail.com/customers.csv"
order_items_file_path = "dbfs:/FileStore/shared_uploads/riakundra2003@gmail.com/order_items.csv"
orders_file_path = "dbfs:/FileStore/shared_uploads/riakundra2003@gmail.com/orders.csv"
product_file_path = "dbfs:/FileStore/shared_uploads/riakundra2003@gmail.com/product.csv"

# Load CSV files into DataFrames
customers_df = spark.read.csv(customers_file_path, header=True, inferSchema=True)
order_items_df = spark.read.csv(order_items_file_path, header=True, inferSchema=True)
orders_df = spark.read.csv(orders_file_path, header=True, inferSchema=True)
product_df = spark.read.csv(product_file_path, header=True, inferSchema=True)

# Display first few rows to ensure proper load
customers_df.show()
order_items_df.show()
orders_df.show()
product_df.show()


+---+-----------------+----------------+----------+----------+-------------+----------+--------------------+------+
| id|     customername|           state|      city|created_on|date_of_birth|updated_on|               email|gender|
+---+-----------------+----------------+----------+----------+-------------+----------+--------------------+------+
|267|      Mala Pratap|  Madhya Pradesh|    Indore|2018-12-06|   1983-11-04|2018-12-06|Mala Pratap@outlo...|     f|
| 59|          Anudeep|  Madhya Pradesh|    Indore|2018-08-26|   1978-09-09|2018-08-26|                null|  null|
|273|    Shakshi Sagar|        Nagaland|    Kohima|2018-04-17|   1996-11-06|2019-03-27|     Sagar@gmail.com|     f|
|116|     Ekta Chauhan|  Madhya Pradesh|    Indore|2018-06-28|   1987-04-20|2018-06-28|Ekta Chauhan@outl...|     f|
| 92|         Bhutekar|  Madhya Pradesh|    Indore|2019-01-04|   1989-10-08|2019-01-04|                null|  null|
| 48|    Anjali Juneja|           Delhi|     Delhi|2019-02-01|   1987-11

In [0]:
# Define paths for Bronze Delta tables
bronze_customers_path = "/bronze/customers"
bronze_order_items_path = "/bronze/order_items"
bronze_orders_path = "/bronze/orders"
bronze_product_path = "/bronze/product"

# Save DataFrames to Delta tables in the Bronze layer
customers_df.write.format("delta").mode("overwrite").save(bronze_customers_path)
order_items_df.write.format("delta").mode("overwrite").save(bronze_order_items_path)
orders_df.write.format("delta").mode("overwrite").save(bronze_orders_path)
product_df.write.format("delta").mode("overwrite").save(bronze_product_path)


In [0]:
# Importing functions module from PySpark
from pyspark.sql import functions as F

# Checking for null values in individual columns
customers_null_count = customers_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in customers_df.columns])
order_items_null_count = order_items_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in order_items_df.columns])
orders_null_count = orders_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in orders_df.columns])
product_null_count = product_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in product_df.columns])

# Show the null count for each DataFrame
customers_null_count.show()
order_items_null_count.show()
orders_null_count.show()
product_null_count.show()

# Checking for duplicates
# Checking for duplicates - replace 'order_id' with the correct column name 'id'
customers_dup_count = customers_df.groupBy("id").count().filter("count > 1").count()

# Correcting for `order_items_df` (use 'product_id' or combination as needed)
order_items_dup_count = order_items_df.groupBy("product_id").count().filter("count > 1").count()

# Check for duplicates in the orders DataFrame (use 'id' instead of 'order_id')
orders_dup_count = orders_df.groupBy("id").count().filter("count > 1").count()

# Check for duplicates in the products DataFrame
product_dup_count = product_df.groupBy("id").count().filter("count > 1").count()

# Display the counts of duplicates
print(f"Duplicate customers: {customers_dup_count}")
print(f"Duplicate order items: {order_items_dup_count}")
print(f"Duplicate orders: {orders_dup_count}")
print(f"Duplicate products: {product_dup_count}")


+---+------------+-----+----+----------+-------------+----------+-----+------+
| id|customername|state|city|created_on|date_of_birth|updated_on|email|gender|
+---+------------+-----+----+----------+-------------+----------+-----+------+
|  0|           0|    0|   0|         0|            0|         0|   20|   158|
+---+------------+-----+----+----------+-------------+----------+-----+------+

+--------+------+------+--------+----------+
|order_id|amount|profit|quantity|product_id|
+--------+------+------+--------+----------+
|       0|     0|     0|       0|         0|
+--------+------+------+--------+----------+

+---+----------+-----------+
| id|order_date|customer_id|
+---+----------+-----------+
|  0|         0|          0|
+---+----------+-----------+

+---+--------+-------+
| id|category|product|
+---+--------+-------+
|  0|       0|      0|
+---+--------+-------+

Duplicate customers: 0
Duplicate order items: 17
Duplicate orders: 0
Duplicate products: 0


In [0]:
# Step 1: Data Cleaning

# Example cleaning: Remove rows with null values in critical columns
customers_cleaned = customers_df.na.drop(subset=["id", "customername", "email"])
orders_cleaned = orders_df.na.drop(subset=["id", "customer_id"])
order_items_cleaned = order_items_df.na.drop(subset=["order_id", "product_id", "quantity"])
product_cleaned = product_df.na.drop(subset=["id", "product"])

# Step 2: Join Tables

# Join orders with customers to get customer details for each order
orders_with_customers = orders_cleaned.join(
    customers_cleaned,
    orders_cleaned.customer_id == customers_cleaned.id,
    "left"
).select(orders_cleaned.id.alias("order_id"), "customer_id", "order_date", "customername", "email", "state", "city")


# Join the result with order items to get item details for each order
orders_with_items = orders_with_customers.join(
    order_items_cleaned,
    orders_with_customers.order_id == order_items_cleaned.order_id,
    "left"
).select(orders_with_customers.order_id, "customer_id", "order_date", "customername", "email", 
           order_items_cleaned.product_id, "quantity", "profit", "amount","state", "city")


# Join with products to get product details
final_silver_df = orders_with_items.join(
    product_cleaned,
    orders_with_items.product_id == product_cleaned.id,
    "left"
).select(orders_with_items.order_id, orders_with_items.customer_id, "order_date", "customername", "email", 
           product_cleaned.product.alias("product_name"), "category", 
           "quantity", "profit", "amount", "state", "city")


silver_table_path = "/silver/transformed_orders13"

# Write the DataFrame to Delta format
final_silver_df.write.format("delta").mode("overwrite").save(silver_table_path)


In [0]:
final_silver_df.show()

+--------+-----------+----------+-------------+--------------------+------------+-----------+--------+-------+------+--------------+---------+
|order_id|customer_id|order_date| customername|               email|product_name|   category|quantity| profit|amount|         state|     city|
+--------+-----------+----------+-------------+--------------------+------------+-----------+--------+-------+------+--------------+---------+
| B-25709|          1|2018-07-01|Aakanksha D/O|Aakanksha D/O@out...|      Chairs|  Furniture|       1|   -6.0|  41.0|Madhya Pradesh|   Indore|
| B-25709|          1|2018-07-01|Aakanksha D/O|Aakanksha D/O@out...|       Saree|   Clothing|       7|  -12.0|  33.0|Madhya Pradesh|   Indore|
| B-26081|          2|2019-03-22|      Aarushi|   Aarushi@gmail.com|   Bookcases|  Furniture|       5| -338.0| 359.0|    Tamil Nadu|  Chennai|
| B-26081|          2|2019-03-22|      Aarushi|   Aarushi@gmail.com| Accessories|Electronics|       3|    0.0| 169.0|    Tamil Nadu|  Chennai|

In [0]:
# Step 1: Read Data from Silver Table
silver_table_path = "/silver/transformed_orders13"
silver_df = spark.read.format("delta").load(silver_table_path)
# Display the schema of the DataFrame
silver_df.printSchema()

#create a database for gold layer
spark.sql("CREATE DATABASE IF NOT EXISTS gold_db3")

# Step 2: Perform  required Aggregations and save in the presentation layer

# 1 Total profit by product category
total_profit_by_category = silver_df.groupBy("category").agg(
    F.sum("profit").alias("total_profit")
).orderBy("category")

total_profit_by_category.write.format("delta").mode("overwrite").saveAsTable("gold_db3.total_profit_by_category")

# 2 Number of orders placed by customers in each city instead of state
orders_by_state = final_silver_df.groupBy("state").agg(
    F.countDistinct("order_id").alias("num_orders")
)
orders_by_state = orders_by_state.na.drop(subset=["state"])

orders_by_state.write.format("delta").mode("overwrite").saveAsTable("gold_db3.orders_by_state")

# 3 Total amount spent by each customer
total_spent_by_customer = silver_df.groupBy("customer_id").agg(
    F.sum("amount").alias("total_spent")
).orderBy("customer_id")

total_spent_by_customer.write.format("delta").mode("overwrite").saveAsTable("gold_db3.total_spent_by_customer")

#df = spark.read.format("delta").load("/gold/total_spent_by_customer")
#print(df.dtypes)
#print(total_spent_by_customer.dtypes)

# 4 Average profit per order for each city
average_profit_per_order_by_city = final_silver_df.groupBy("city").agg(
    (F.sum("profit") / F.countDistinct("order_id")).alias("average_profit_per_order")
)

average_profit_per_order_by_city = average_profit_per_order_by_city.na.drop(subset=["city"])

average_profit_per_order_by_city.write.format("delta").mode("overwrite").saveAsTable("gold_db3.average_profit_per_order_by_city")

# 5 Top 5 customers who spent the most
top_customers = silver_df.groupBy("customer_id").agg(
    F.sum("amount").alias("total_spent")
).orderBy(F.desc("total_spent")).limit(5)

top_customers.write.format("delta").mode("overwrite").saveAsTable("gold_db3.top_customers")

# 6 Total revenue generated by each product
total_revenue_by_product = silver_df.groupBy("product_name").agg(
    F.sum("amount").alias("total_revenue")
).orderBy("product_name")

total_revenue_by_product.write.format("delta").mode("overwrite").saveAsTable("gold_db3.total_revenue_by_product")

# 7 Number of orders placed in each product category
orders_by_product_category = silver_df.groupBy("category").agg(
    F.countDistinct("order_id").alias("order_count")
).orderBy("category")

orders_by_product_category.write.format("delta").mode("overwrite").saveAsTable("gold_db3.orders_by_product_category")

# 8 Average profit per order for each product category
average_profit_per_order_by_category = silver_df.groupBy("category").agg(
    F.avg("profit").alias("average_profit_per_order")
).orderBy("category")

average_profit_per_order_by_category.write.format("delta").mode("overwrite").saveAsTable("gold_db3.average_profit_per_order_by_category")

# 9 Total amount spent in each city
total_amount_by_city = final_silver_df.groupBy("city").agg(
    F.sum("amount").alias("total_amount_spent")
)

total_amount_by_city = total_amount_by_city.na.drop(subset=["city"])

total_amount_by_city.write.format("delta").mode("overwrite").saveAsTable("gold_db3.total_amount_by_city")

# 10 Number of orders placed for each product
orders_by_product = silver_df.groupBy("product_name").agg(
    F.countDistinct("order_id").alias("order_count")
).orderBy("product_name")

orders_by_product.write.format("delta").mode("overwrite").saveAsTable("gold_db3.orders_by_product")


root
 |-- order_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- customername: string (nullable = true)
 |-- email: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- profit: double (nullable = true)
 |-- amount: double (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)



In [0]:
top_customers.show()
top_customers.dtypes

+-----------+-----------+
|customer_id|total_spent|
+-----------+-----------+
|        397|     9177.0|
|        360|     6611.0|
|        343|     6339.0|
|        129|     6026.0|
|        239|     5809.0|
+-----------+-----------+

Out[13]: [('customer_id', 'int'), ('total_spent', 'double')]

In [0]:
total_profit_by_category.show()

+-----------+------------+
|   category|total_profit|
+-----------+------------+
|   Clothing|     11163.0|
|Electronics|     10494.0|
|  Furniture|      2298.0|
+-----------+------------+

