In [0]:
#All Required Import's  
from pyspark.sql import functions as F
from delta.tables import DeltaTable

In [0]:
#=====================================================RAW TBALE LOADING===================================================
raw_table=spark.read.format('delta').table('e_comm_data_modeling.default.raw_order')
#=========================================================================================================================

In [0]:
#=========================================================================================================================
#Customer dim table 
customer_source=raw_table\
    .select(['customer_id','customer_name'])\
        .dropna(how='all')\
            .distinct()

spark.sql('''
CREATE TABLE IF NOT EXISTS e_comm_data_modeling.dim_fact_data.customer(
customer_id string ,
customer_name string )
USING DELTA 
TBLPROPERTIES(QUALITY='SILVER')
''')

# Ensure the target table is a Delta table
#SCD Type 1
customer_table = 'e_comm_data_modeling.dim_fact_data.customer'
cust_target = DeltaTable.forName(spark, customer_table)


merge_cond = 'target.customer_id = source.customer_id'
cust_target.alias('target').merge(
    customer_source.alias('source'),merge_cond)\
        .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

#=========================================================================================================================


In [0]:
#=========================================================================================================================
#product dim table 
product_source=raw_table\
    .select(['product_id','product_name','category'])\
        .dropna(how='all')\
            .distinct()

spark.sql('''
CREATE TABLE IF NOT EXISTS e_comm_data_modeling.dim_fact_data.product(
product_id string ,
product_name string,
category string )
USING DELTA 
TBLPROPERTIES(QUALITY='SILVER')
''')


# Ensure the target table is a Delta table
#SCD Type 1
product_table = 'e_comm_data_modeling.dim_fact_data.product'
product_target = DeltaTable.forName(spark, product_table)


merge_cond = 'target.product_id = source.product_id'
product_target.alias('target').merge(
    product_source.alias('source'),merge_cond)\
        .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
#=========================================================================================================================        

In [0]:
#=========================================================================================================================
#adress dim table 
address_source=raw_table\
    .select(['customer_id','address_id'])\
        .dropna(how='all')\
            .distinct()

spark.sql('''
CREATE TABLE IF NOT EXISTS e_comm_data_modeling.dim_fact_data.address(
customer_id string ,
address_id string
)
USING DELTA 
TBLPROPERTIES(QUALITY='SILVER')
''')


# Ensure the target table is a Delta table
#SCD Type 1
address_table = 'e_comm_data_modeling.dim_fact_data.address'
address_target = DeltaTable.forName(spark, address_table)


merge_cond = 'target.customer_id = source.customer_id and target.address_id = source.address_id'
address_target.alias('target').merge(
    address_source.alias('source'),merge_cond)\
        .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
#========================================================================================================================        

In [0]:
#=========================================================================================================================
#seller dim table 
seller_source=raw_table\
    .select(['seller_id','product_id'])\
        .dropna(how='all')\
            .distinct()

spark.sql('''
CREATE TABLE IF NOT EXISTS e_comm_data_modeling.dim_fact_data.seller(
seller_id string ,
product_id string
)
USING DELTA 
TBLPROPERTIES(QUALITY='SILVER')
''')


# Ensure the target table is a Delta table
#SCD Type 1
seller_table = 'e_comm_data_modeling.dim_fact_data.seller'
seller_target = DeltaTable.forName(spark, seller_table)


merge_cond = 'target.seller_id = source.seller_id and target.product_id = source.product_id'
seller_target.alias('target').merge(
    seller_source.alias('source'),merge_cond)\
        .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
#=========================================================================================================================

In [0]:
#=============================================================DATE DIM===================================================
date_source1=raw_table.select('order_date').dropna().distinct()
date_source2=raw_table.select('delivery_date').dropna().distinct()

date_source=date_source1.union(date_source2)

#seller dim table 
date_source1=raw_table.select('order_date').dropna().distinct()
date_source2=raw_table.select('delivery_date').dropna().distinct()

date_source=date_source1.union(date_source2)

date_source=date_source.withColumn('date_key',F.regexp_replace(F.col('order_date'),'-','').cast('int'))\
    .withColumn('full_date',F.to_date(F.col('order_date'),'yyyy-MM-dd'))\
        .withColumn('day',F.dayofmonth(F.col('full_date')))\
            .withColumn('month',F.month(F.col('full_date')))\
                .withColumn('year',F.year(F.col('full_date')))\
                    .withColumn('weekday',F.dayofweek(F.col('full_date')))\
                        .withColumn('is_weekend',F.dayofweek(F.col('full_date')).isin([1,7]).cast('int'))\
                            .drop('order_date')



In [0]:

spark.sql('''
CREATE TABLE IF NOT EXISTS e_comm_data_modeling.dim_fact_data.date_table(
date_key integer ,
full_date date ,
day integer ,
month integer ,
year integer ,
weekday integer ,
is_weekend integer

)
USING DELTA 
TBLPROPERTIES(QUALITY='SILVER')
''')


# Ensure the target table is a Delta table
#SCD Type 1
date_table = 'e_comm_data_modeling.dim_fact_data.date_table'
date_target = DeltaTable.forName(spark, date_table)

merge_cond = 'target.date_key = source.date_key'
date_target.alias('target').merge(
    date_source.alias('source'),merge_cond)\
        .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

#=========================================================================================================================

In [0]:
# spark.sql('''select * from e_comm_data_modeling.dim_fact_data.date_table''').count()

In [0]:
#=========================================================================================================================
#adress FACT table 
orders_source=raw_table\
    .select(*['order_id', 'product_id', 'seller_id', 'customer_id',
 'address_id', 'order_date', 'delivery_date', 'quantity',
 'unit_price', 'discount', 'is_cancelled', 'is_returned']
).dropna(how='all')\
    .distinct()

# from pyspark.sql.window import Window

# wind_spec = Window.partitionBy(F.col('order_id'), F.col('product_id'), F.col('customer_id')).orderBy(F.col('order_id'), F.col('product_id'), F.col('customer_id'))

# orders_source=orders_source.withColumn('rn',F.row_number().over(wind_spec))
# orders_source.show()
# i was trying exclude duplicates but if we do that will lose the cancellation data, so let it be will just append

orders_source=orders_source.withColumn('order_date',F.to_date(F.col('order_date'),'yyyy-MM-dd'))\
    .withColumn('delivery_date',F.to_date(F.col('delivery_date'),'yyyy-MM-dd'))\
        .withColumn('delivery_date',F.to_date(F.col('delivery_date'),'yyyy-MM-dd'))\
            .withColumn('duration',F.datediff(F.col('order_date'),F.when(F.col('delivery_date').isNotNull(),F.col('delivery_date')).otherwise(F.col('order_date'))))\
                .withColumn('duration',F.abs(F.col('duration')))\
                    .withColumn('total_amount',(F.col('quantity')*F.col('unit_price'))-F.col('discount'))

In [0]:
spark.sql('''
CREATE TABLE IF NOT EXISTS e_comm_data_modeling.dim_fact_data.fact_order_item(
order_id  string 	 ,
product_id  string   ,
seller_id  string    ,
customer_id  string  ,
address_id  string   ,
order_date  date     ,
delivery_date  date  ,
quantity  int        ,
unit_price  int      ,
discount  int        ,
is_cancelled  int    , 
is_returned  int     ,
duration  int        ,
total_amount  int 
)
USING DELTA 
TBLPROPERTIES(QUALITY='SILVER')
''')

orders_source.write.mode('append').insertInto('e_comm_data_modeling.dim_fact_data.fact_order_item')
# here we are not using any merge because of below reason 
# -- SCD Type 2 can be applied on the fact table to track changes like delivery status over time.
# -- However, for now, we'll use an append-only strategy since the current dataset already includes 
# -- flags for cancellation and returns, which are sufficient for downstream analysis.

# Action	Status
# Use append-only?	✅ Yes
# Include status flags?	✅ Already present (is_cancelled, etc.)
# Apply SCD Type 2 now?	❌ Skip for now (adds complexity)

# Later, if your use case expands (e.g., order status lifecycle tracking), you can revisit SCD2 logic for the fact.
# ========================================================================================================================