In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
from pyspark.sql.types import *
from pyspark.sql import SparkSession

In [0]:
%python
dbutils.fs.ls("/mnt/salesData/gold")

[FileInfo(path='dbfs:/mnt/salesData/gold/dim_customer/', name='dim_customer/', size=0, modificationTime=1749582617000),
 FileInfo(path='dbfs:/mnt/salesData/gold/dim_product/', name='dim_product/', size=0, modificationTime=1749582619000),
 FileInfo(path='dbfs:/mnt/salesData/gold/fact_sales/', name='fact_sales/', size=0, modificationTime=1749582620000)]

In [0]:
dim_customer = spark.read.csv("dbfs:/mnt/salesData/gold/dim_customer/", header=True, inferSchema=True)
dim_product = spark.read.csv("dbfs:/mnt/salesData/gold/dim_product/", header=True, inferSchema=True)
fact_sales = spark.read.csv("dbfs:/mnt/salesData/gold/fact_sales/", header=True, inferSchema=True)


In [0]:
# creat temp view or table to use sql queries
dim_product.createOrReplaceTempView("view_product")
dim_customer.createOrReplaceTempView("view_customer")
fact_sales.createOrReplaceTempView("view_sales")


In [0]:
customer_report =spark.sql("""
        with base_query AS(
            SELECT
                sf.order_date,
                sf.product_key,
                sf.order_number,
                sf.sales_amount,
                sf.sales_quantity AS quantity,
                cd.customer_id AS customer_number,
                FLOOR(months_between(current_date(), cd.birthdate) / 12) AS customer_age,
                concat_ws(' ', cd.firstname, cd.lastname) as customer_name,
                cd.customer_key 
            from view_sales sf 
            LEFT JOIN view_customer cd ON sf.customer_id = cd.customer_id
            WHERE sf.order_date is not NULL
        ),
customer_aggregation AS(
            SELECT 
                customer_age,
                customer_name,
                customer_key,
                customer_number,
                count(DISTINCT order_number) AS total_orders,
                sum(sales_amount) AS total_sales,
                count(DISTINCT product_key) AS total_products,
                sum(quantity) as total_quantity,
                max(order_date) as last_order_date,
                months_between(max(order_date), min(order_date)) AS lifespan
            from base_query
            GROUP BY customer_key, customer_number, customer_name, customer_age
        )

SELECT
customer_age,
customer_name,
customer_key,
customer_number,
case
    when customer_age < 20 then 'under 20'
    when customer_age between 20 and 29 then '20-29'
    when customer_age between 30 and 39 then '30-39'
    when customer_age between 40 and 49 then '40-49'
    else '50 and above'
end age_group,
case
    when lifespan >= 12 and total_sales > 5000 then 'VIP'
    when lifespan >= 12 and total_sales <= 5000 then 'Regular'
    else 'New'
end customer_segment,
total_orders,
total_sales,
total_quantity,
total_products,
last_order_date,
-- is customer still active
FLOOR(months_between(current_date(), last_order_date)) AS recency,
lifespan,
-- avg order value
case
 when total_sales = 0 then 0
 else round(total_sales / total_orders,2)
end AS avg_order_value,
-- avg monthly spend
case
 when lifespan = 0 then total_sales
 else round(total_sales / lifespan,2)
end AS avg_monthly_spend
from customer_aggregation
""")

In [0]:
product_report =spark.sql(
    """
    with base_query AS (
        SELECT
            sf.order_date,
            sf.customer_id AS customer_key,
            sf.order_number,
            sf.sales_amount,
            sf.sales_quantity AS quantity,
            pd.product_key AS product_number,
            pd.product_name,
            pd.product_category AS category,
            pd.product_subcategory AS subcategory,
            pd.maintenance,
            pd.product_cost AS cost,
            pd.product_line,
            pd.product_id AS product_key
        
        from view_sales sf 
        LEFT JOIN view_product pd ON sf.product_key = pd.product_key
        WHERE sf.order_date is not NULL
        
        ),
    product_aggregation AS(
    SELECT 
        product_number,
        category,
        subcategory,
        product_name,
        product_key,
        cost ,
        count(DISTINCT order_number) AS total_orders,
        sum(sales_amount) AS total_sales,
        count(DISTINCT customer_key) AS total_customers,
        sum(quantity) as total_quantity,
        max(order_date) as last_order_date,
        months_between(max(order_date), min(order_date)) AS lifespan
    from base_query
    GROUP BY product_key, product_number, product_name, category, subcategory,cost 
    
    )

SELECT
    product_number,
    category,
    subcategory,
    product_name,
    product_key,
    cost ,
    case
        when total_sales > 50000 then 'High-Performer'
        when total_sales >= 10000 then 'Mid-Range'
        else 'Low-Performer'
    end product_segment,
    total_orders,
    total_sales,
    total_quantity,
    total_customers,
    last_order_date,
    -- is customer still active
    FLOOR(months_between(current_date(), last_order_date)) AS recency,
    -- avg order value
    case
     when total_sales = 0 then 0
     else round(total_sales / total_orders,2)
    end AS avg_order_revenue,
    -- avg monthly spend
    case
     when lifespan = 0 then total_sales
     else round(total_sales / lifespan,2)
    end AS avg_monthly_revenue
from product_aggregation
    """
)

In [0]:
# write all transform data as csv to silver dir
def write_to_gold(df,table_name):
    try:
        df.write.mode("overwrite").format("csv").option("header",True).save(f"/mnt/salesData/gold/{table_name}")
        print(f"{table_name} written to gold")
    except Exception as e:
        print(f"error in writing {table_name} to gold",e)  # read error message
      

In [0]:
write_to_gold(customer_report,'customer_report')
write_to_gold(product_report,'product_report')



customer_report written to gold
product_report written to gold
