In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
%sql
USE CATALOG amazonretail;
USE SCHEMA gold;

In [0]:
df_sales = spark.table("amazonretail.silver.silver_sales")
display(df_sales)

transaction_id,quantity,transaction_date,product_name,category,price,first_name,last_name,email,phone,customer_city,registration_date,store_name,store_location,sales
1,4,2025-03-31,Desk Organizer,Accessories,399.0,Rohit,Mehta,user127@example.com,9621098765,Bangalore,2023-08-30,Downtown Mini Store,Pune,1596.0
2,5,2024-11-12,Yoga Mat,Fitness,499.0,Riya,Singh,user105@example.com,9843210987,Chennai,2023-06-28,Downtown Mini Store,Pune,2495.0
3,3,2025-05-01,Bluetooth Speaker,Electronics,1299.49,Rakesh,Kapoor,user116@example.com,9732109876,Kolkata,2023-06-15,High Street Store,Delhi,3898.47
4,1,2024-11-02,Desk Organizer,Accessories,399.0,Alka,Mishra,user120@example.com,9698765432,Hyderabad,2023-12-01,City Mall Store,Mumbai,399.0
5,1,2025-03-17,Notebook Set,Stationery,149.0,Riya,Singh,user105@example.com,9843210987,Chennai,2023-06-28,High Street Store,Delhi,149.0
6,5,2025-01-04,Smartwatch,Electronics,4999.0,Deepak,Nair,user110@example.com,9798765432,Mumbai,2023-10-14,Tech World Outlet,Bangalore,24995.0
7,5,2025-01-01,Smartwatch,Electronics,4999.0,Deepak,Nair,user110@example.com,9798765432,Mumbai,2023-10-14,High Street Store,Delhi,24995.0
8,2,2025-06-08,Smartwatch,Electronics,4999.0,Bhavna,Nair,user126@example.com,9632109876,Mumbai,2023-07-17,Mega Plaza,Chennai,9998.0
9,2,2024-10-08,Wireless Mouse,Electronics,799.99,Arjun,Yadav,user123@example.com,9665432109,Ahmedabad,2024-02-25,Tech World Outlet,Bangalore,1599.98
10,5,2024-08-27,Bluetooth Speaker,Electronics,1299.49,Kavita,Sharma,user124@example.com,9654321098,Kolkata,2023-11-15,High Street Store,Delhi,6497.45


In [0]:
df_product_agg = df_sales.groupBy('category','product_name').agg(sum('sales').alias("total_sales"))
display(df_product_agg)

category,product_name,total_sales
Accessories,Desk Organizer,9975.0
Fitness,Yoga Mat,4491.0
Electronics,Bluetooth Speaker,10395.92
Stationery,Notebook Set,1043.0
Electronics,Smartwatch,74985.0
Electronics,Wireless Mouse,14399.82
Fitness,Water Bottle,3289.0
Fitness,Dumbbell Set,15992.0


In [0]:
df_store_agg = df_sales.groupBy('store_location','store_name').agg(sum('sales').alias("total_sales"))
display(df_store_agg)

store_location,store_name,total_sales
Pune,Downtown Mini Store,24072.92
Delhi,High Street Store,37333.92
Mumbai,City Mall Store,1692.0
Bangalore,Tech World Outlet,42186.91
Chennai,Mega Plaza,29284.99


In [0]:
df_customer_agg = df_sales.groupBy(concat('first_name',lit(' '),'last_name').alias("customer_name"))\
                          .agg(sum('sales').alias("total_sales"))
display(df_customer_agg)

customer_name,total_sales
Rohit Mehta,1995.0
Riya Singh,6643.95
Rakesh Kapoor,9484.47
Alka Mishra,4398.95
Deepak Nair,49990.0
Bhavna Nair,10596.0
Arjun Yadav,1599.98
Kavita Sharma,6497.45
Nina Joshi,1599.98
Rahul Verma,596.0


In [0]:
df_customer_city_agg = df_sales.groupBy("customer_city")\
                          .agg(sum('sales').alias("total_sales"))
display(df_customer_city_agg)

customer_city,total_sales
Bangalore,8392.97
Chennai,8138.95
Kolkata,16577.92
Hyderabad,6394.95
Mumbai,63381.98
Ahmedabad,9595.98
Delhi,17791.99
Pune,4296.0


In [0]:
df_sales_agg = df_sales.groupBy('transaction_date', 'product_name', 'category', concat('first_name',lit(' '),'last_name').alias("customer_name"), 'customer_city', 'store_name', 'store_location')\
                       .agg(sum('sales').alias("total_sales"), 
                            sum('quantity').alias("total_quantity"))
display(df_sales_agg)

transaction_date,product_name,category,customer_name,customer_city,store_name,store_location,total_sales,total_quantity
2025-03-31,Desk Organizer,Accessories,Rohit Mehta,Bangalore,Downtown Mini Store,Pune,1596.0,4
2024-11-12,Yoga Mat,Fitness,Riya Singh,Chennai,Downtown Mini Store,Pune,2495.0,5
2025-05-01,Bluetooth Speaker,Electronics,Rakesh Kapoor,Kolkata,High Street Store,Delhi,3898.47,3
2024-11-02,Desk Organizer,Accessories,Alka Mishra,Hyderabad,City Mall Store,Mumbai,399.0,1
2025-03-17,Notebook Set,Stationery,Riya Singh,Chennai,High Street Store,Delhi,149.0,1
2025-01-04,Smartwatch,Electronics,Deepak Nair,Mumbai,Tech World Outlet,Bangalore,24995.0,5
2025-01-01,Smartwatch,Electronics,Deepak Nair,Mumbai,High Street Store,Delhi,24995.0,5
2025-06-08,Smartwatch,Electronics,Bhavna Nair,Mumbai,Mega Plaza,Chennai,9998.0,2
2024-10-08,Wireless Mouse,Electronics,Arjun Yadav,Ahmedabad,Tech World Outlet,Bangalore,1599.98,2
2024-08-27,Bluetooth Speaker,Electronics,Kavita Sharma,Kolkata,High Street Store,Delhi,6497.45,5


### Writing all the files to Gold Layer

In [0]:
def write_to_gold_tables(df,table_name):
    df.write.format('delta')\
            .mode('overwrite')\
            .option('overwriteSchema','true')\
            .saveAsTable(f"amazonretail.gold.{table_name}")

In [0]:
write_to_gold_tables(df_product_agg,'gold_product_agg')
write_to_gold_tables(df_store_agg,'gold_store_agg')
write_to_gold_tables(df_customer_agg,'gold_customer_agg')
write_to_gold_tables(df_customer_city_agg,'gold_customer_city_agg')
write_to_gold_tables(df_sales_agg,'gold_sales_agg')

### Writing all the files in Delta Format to Gold Container in ADLS

In [0]:
gold_path = 'abfss://gold@amazonprimeadls.dfs.core.windows.net/'

df_product_agg.write.format('delta').mode('overwrite').option('overwriteSchema','true').save(f"{gold_path}product_agg")
df_store_agg.write.format('delta').mode('overwrite').option('overwriteSchema','true').save(f"{gold_path}store_agg")
df_customer_agg.write.format('delta').mode('overwrite').option('overwriteSchema','true').save(f"{gold_path}customer_agg")
df_customer_city_agg.write.format('delta').mode('overwrite').option('overwriteSchema','true').save(f"{gold_path}customer_city_agg")
df_sales_agg.write.format('delta').mode('overwrite').option('overwriteSchema','true').save(f"{gold_path}sales_agg")