In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime


## VIEW RAW DATA

In [0]:
%sql
select * from workspace.raw.rawdata

OrderID,OrderDate,CustomerID,CustomerName,ProductID,ProductName,Quantity,SalesAmount,lastupdated
1001,2025-08-01,C001,John Doe,P001,Laptop,1,1200.0,2025-08-10T06:52:44.088Z
1002,2025-08-01,C002,Jane Smith,P002,Mouse,2,50.0,2025-08-10T06:52:44.088Z
1003,2025-08-02,C001,John Doe,P003,Keyboard,1,80.0,2025-08-10T06:52:44.088Z
1004,2025-08-03,C003,Mark Lee,P001,Laptop,1,1200.0,2025-08-10T06:52:44.088Z
1005,2025-08-04,C004,Susan Ray,P004,Monitor,2,300.0,2025-08-10T06:52:44.088Z


## GET LAST UPDATED TIME

In [0]:
if spark.catalog.tableExists("workspace.bronze.bronzetable"):
    last_load_time = spark.sql("""SELECT MAX(lastupdated) FROM workspace.bronze.bronzetable""").collect()[0][0]
    
    if last_load_time is None:
        last_load_time = datetime(2024, 1, 1, 0, 0, 0)
else:
    last_load_time = datetime(2024, 1, 1, 0, 0, 0)

last_load_time

datetime.datetime(2025, 8, 10, 6, 52, 44, 88555)

## CREATE VIEW

In [0]:
spark.sql(f"""select * from workspace.raw.rawdata where lastupdated > '{last_load_time}'""").createOrReplaceTempView("bronze_source")

In [0]:
%sql
select * from bronze_source

OrderID,OrderDate,CustomerID,CustomerName,ProductID,ProductName,Quantity,SalesAmount,lastupdated
1006,2025-08-06,C006,Emma White,P006,Webcam,1,90.0,2025-08-10T07:16:19.258Z
1007,2025-08-07,C002,Jane Smith,P002,Mouse,3,75.0,2025-08-10T07:16:19.258Z
1005,2025-08-04,C004,Suresh kumar,P004,Monitor,2,300.0,2025-08-10T07:16:19.258Z


In [0]:
%sql
create table if not exists workspace.bronze.bronzetable
as
select * from bronze_source

num_affected_rows,num_inserted_rows


In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists("workspace.bronze.bronzetable"):
    DeltaTable.forName(spark, "workspace.bronze.bronzetable").alias('trg').merge(
        spark.table("bronze_source").alias('src'), 'trg.OrderID = src.OrderID'
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
else:
    spark.table("bronze_source").write.saveAsTable("workspace.bronze.bronzetable")

In [0]:
%sql
select * from workspace.bronze.bronzetable

OrderID,OrderDate,CustomerID,CustomerName,ProductID,ProductName,Quantity,SalesAmount,lastupdated
1001,2025-08-01,C001,John Doe,P001,Laptop,1,1200.0,2025-08-10T06:52:44.088Z
1002,2025-08-01,C002,Jane Smith,P002,Mouse,2,50.0,2025-08-10T06:52:44.088Z
1003,2025-08-02,C001,John Doe,P003,Keyboard,1,80.0,2025-08-10T06:52:44.088Z
1004,2025-08-03,C003,Mark Lee,P001,Laptop,1,1200.0,2025-08-10T06:52:44.088Z
1006,2025-08-06,C006,Emma White,P006,Webcam,1,90.0,2025-08-10T07:16:19.258Z
1007,2025-08-07,C002,Jane Smith,P002,Mouse,3,75.0,2025-08-10T07:16:19.258Z
1005,2025-08-04,C004,Suresh kumar,P004,Monitor,2,300.0,2025-08-10T07:16:19.258Z
