In [0]:
%sql
create database if not exists fact

In [0]:
%sql
create table if not exists fact.Sales(
  RowId INT,  
  OrderID varchar(20),
  CustomerID varchar(20),
  ProductId VARCHAR(20), 
  OrderDate DATE, 
  Quantity INT, 
  Discount FLOAT, 
  Profit FLOAT, 
  Sales FLOAT, 
	createdDate timestamp,
  updatedDate timestamp
)

In [0]:
%sql
Select * from cs_raw.invoice

Row_ID,Order_ID,Order_Date,Ship_Date,Ship_Mode,Customer_ID,Customer_Name,Segment,Country,City,State,Postal_Code,Region,Product_ID,Category,Sub_Category,Product_Name,Sales,Quantity,Discount,Profit
1,CA-2016-152156,2016-11-08 00:00:00.0000000,2016-11-11 00:00:00.0000000,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-BO-10001798,Furniture,Chairs,Bush Somerset Collection Bookcase,261.96,2,0.0,41.9136
2,CA-2016-152156,2016-11-08 00:00:00.0000000,2016-11-11 00:00:00.0000000,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-CH-10000454,Furniture,Chairs,"Hon Deluxe Fabric Upholstered Stacking Chairs, Rounded Back",731.9399999999998,3,0.0,219.582
3,CA-2016-138688,2016-06-12 00:00:00.0000000,2016-06-16 00:00:00.0000000,Second Class,DV-13045,Darrin Van Huff,Corporate,United States,Los Angeles,California,90036,West,OFF-LA-10000240,Office Supplies,Labels,Self-Adhesive Address Labels for Typewriters by Universal,14.62,2,0.0,6.8714
4,US-2015-108966,2015-10-11 00:00:00.0000000,2015-10-18 00:00:00.0000000,Standard Class,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,Florida,33311,South,FUR-TA-10000577,Furniture,Tables,Bretford CR4500 Series Slim Rectangular Table,957.5775000000002,5,0.45,-383.03100000000006
5,US-2015-108966,2015-10-11 00:00:00.0000000,2015-10-18 00:00:00.0000000,Standard Class,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,Florida,33311,South,OFF-ST-10000760,Office Supplies,Storage,Eldon Fold 'N Roll Cart System,22.368,2,0.2,2.516399999999999
6,CA-2014-115812,2014-06-09 00:00:00.0000000,2014-06-14 00:00:00.0000000,Standard Class,BH-11710,Brosina Hoffman,Consumer,Pakistan,Los Angeles,California,90032,West,FUR-FU-10001487,Furniture,Furnishings,"Eldon Expressions Wood and Plastic Desk Accessories, Cherry Wood",48.86,7,0.0,14.169399999999996
7,CA-2014-115812,2014-06-09 00:00:00.0000000,2014-06-14 00:00:00.0000000,Standard Class,BH-11710,Brosina Hoffman,Consumer,Pakistan,Los Angeles,California,90032,West,OFF-AR-10002833,Office Supplies,Art,Newell 322,7.28,4,0.0,1.9656
8,CA-2014-115812,2014-06-09 00:00:00.0000000,2014-06-14 00:00:00.0000000,Standard Class,BH-11710,Brosina Hoffman,Consumer,Pakistan,Los Angeles,California,90032,West,TEC-PH-10002275,Technology,Phones,Mitel 5320 IP Phone VoIP phone,907.152,6,0.2,90.71520000000004
9,CA-2014-115812,2014-06-09 00:00:00.0000000,2014-06-14 00:00:00.0000000,Standard Class,BH-11710,Brosina Hoffman,Consumer,Pakistan,Los Angeles,California,90032,West,OFF-BI-10003910,Office Supplies,Binders,DXL Angle-View Binders with Locking Rings by Samsill,18.504,3,0.2,5.7825
10,CA-2014-115812,2014-06-09 00:00:00.0000000,2014-06-14 00:00:00.0000000,Standard Class,BH-11710,Brosina Hoffman,Consumer,Pakistan,Los Angeles,California,90032,West,OFF-AP-10002892,Office Supplies,Appliances,Belkin F5C206VTEL 6 Outlet Surge,114.9,5,0.0,34.46999999999999


In [0]:
from pyspark.sql.functions import current_timestamp

# Load the source DataFrame from the raw invoice table
src_df = spark.sql("""
    SELECT DISTINCT 
        Row_ID, Order_ID, Customer_ID, Product_ID, Order_Date, Quantity, Discount, Profit, Sales 
    FROM cs_raw.invoice
""")

# Load the dimension tables for validation
dim_customer = spark.table("dim.Customer")
dim_product = spark.table("dim.Product")
dim_shipmode = spark.table("dim.Shipmode")

# Perform the necessary joins on the dimension tables to filter the source data
valid_src_df = src_df.join(dim_customer, src_df.Customer_ID == dim_customer.CustomerID, "inner") \
    .join(dim_product, src_df.Product_ID == dim_product.ProductID, "inner") \
    .join(dim_shipmode, src_df.Order_ID == dim_shipmode.OrderId, "inner") \
    .select(src_df["*"])

# Define the target table path (if using Delta Lake)
target_table = "fact.Sales"

# Load the target table
from delta.tables import DeltaTable
tgt_tbl = DeltaTable.forName(spark, target_table)

# Perform the merge operation
tgt_tbl.alias("tgt").merge(
    valid_src_df.alias("src"),
    "tgt.RowId = src.Row_ID"
).whenMatchedUpdate(
     condition=""" 
        tgt.OrderDate <> src.Order_Date OR
        tgt.Quantity <> src.Quantity OR
        tgt.Discount <> src.Discount OR
        tgt.Profit <> src.Profit OR
        tgt.Sales <> src.Sales 
    """,
    set={
        "OrderDate": "src.Order_Date",
        "Quantity": "src.Quantity",
        "Discount": "src.Discount",
        "Profit": "src.Profit",
        "Sales": "src.Sales",
        "updatedDate": "current_timestamp()"
    }
).whenNotMatchedInsert(values={
    "RowId": "src.Row_ID",
    "OrderID": "src.Order_ID",
    "CustomerID": "src.Customer_ID",
    "ProductId": "src.Product_ID",
    "OrderDate": "src.Order_Date",
    "Quantity": "src.Quantity",
    "Discount": "src.Discount",
    "Profit": "src.Profit",
    "Sales": "src.Sales",
    "createdDate": "current_timestamp()",
    "updatedDate": "current_timestamp()"
}).execute()


In [0]:
%sql
Select * from fact.Sales
order by RowId 

RowId,OrderID,CustomerID,ProductId,OrderDate,Quantity,Discount,Profit,Sales,createdDate,updatedDate
1,CA-2016-152156,CG-12520,FUR-BO-10001798,2016-11-08,2,0.0,41.9136,261.96,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
2,CA-2016-152156,CG-12520,FUR-CH-10000454,2016-11-08,3,0.0,219.582,731.94,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
3,CA-2016-138688,DV-13045,OFF-LA-10000240,2016-06-12,2,0.0,6.8714,14.62,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
4,US-2015-108966,SO-20335,FUR-TA-10000577,2015-10-11,5,0.45,-383.031,957.5775,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
5,US-2015-108966,SO-20335,OFF-ST-10000760,2015-10-11,2,0.2,2.5164,22.368,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
6,CA-2014-115812,BH-11710,FUR-FU-10001487,2014-06-09,7,0.0,14.1694,48.86,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
7,CA-2014-115812,BH-11710,OFF-AR-10002833,2014-06-09,4,0.0,1.9656,7.28,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
8,CA-2014-115812,BH-11710,TEC-PH-10002275,2014-06-09,6,0.2,90.7152,907.152,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
9,CA-2014-115812,BH-11710,OFF-BI-10003910,2014-06-09,3,0.2,5.7825,18.504,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
10,CA-2014-115812,BH-11710,OFF-AP-10002892,2014-06-09,5,0.0,34.47,114.9,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z


In [0]:
%sql
Select * from fact.Sales

RowId,OrderID,CustomerID,ProductId,OrderDate,Quantity,Discount,Profit,Sales,createdDate,updatedDate
23,CA-2016-137330,KB-16585,OFF-AP-10001492,2016-12-09,7,0.0,15.6884,60.34,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
49,CA-2016-169194,LH-16900,TEC-PH-10003988,2016-06-20,2,0.0,6.104,21.8,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
20,CA-2014-143336,ZD-21925,TEC-PH-10001949,2014-08-27,3,0.2,16.011,213.48,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
18,CA-2014-167164,AG-10270,OFF-ST-10000107,2014-05-13,2,0.0,9.99,55.5,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
27,CA-2016-121755,EH-13945,TEC-AC-10003027,2016-01-16,3,0.0,11.7741,90.57,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
30,US-2015-150630,TB-21520,FUR-FU-10004848,2015-09-17,3,0.2,15.525,124.2,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
46,CA-2016-118255,ON-18715,OFF-BI-10003291,2016-03-11,2,0.0,8.2062,17.46,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
2,CA-2016-152156,CG-12520,FUR-CH-10000454,2016-11-08,3,0.0,219.582,731.94,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
33,US-2015-150630,TB-21520,OFF-BI-10001525,2015-09-17,6,0.7,-5.715,6.858,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
32,US-2015-150630,TB-21520,OFF-AR-10004042,2015-09-17,6,0.2,9.7092,86.304,2025-01-31T07:43:02.518Z,2025-01-31T07:43:02.518Z
