In [0]:
use catalog my_catalog;
use schema default;

Create control table

In [0]:
create table control_table(
  table_name string,
  last_loaded_timestamp timestamp
)

Create loan_data table and insert some sample records

In [0]:
%python
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
from datetime import datetime

# Sample data
data = [
    (1, "John Doe", 5000, "approved", datetime(2024, 7, 1, 10, 0, 0)),
    (2, "Jane Smith", 8000, "pending", datetime(2024, 7, 2, 11, 0, 0)),
    (3, "Alice Johnson", 12000, "rejected", datetime(2024, 7, 3, 9, 30, 0)),
    (4, "Bob Lee", 7000, "approved", datetime(2024, 7, 4, 14, 15, 0))
]

columns = ["loan_id", "customer_name", "loan_amount", "loan_status", "last_updated_timestamp"]

df = spark.createDataFrame(data, schema=columns)
df.show()

df.write.format('delta').saveAsTable('loan_data')


In [0]:
select * from loan_data

Update control table with last_updated_timestamp value from loan_data table

In [0]:
insert into control_table(table_name) values('loan_data');
update control_table
set last_loaded_timestamp = (select max(last_updated_timestamp) from loan_data);

In [0]:
select * from control_table

create a loan source table with some sample records

In [0]:
%python
from datetime import datetime
from pyspark.sql import Row

# Sample source data
source_data = [
    Row(loan_id=1, customer_name="John Doe", loan_amount=5500, loan_status="approved", last_updated_timestamp=datetime(2024, 7, 6, 12, 0)),
    Row(loan_id=2, customer_name="Jane Smith", loan_amount=8000, loan_status="approved", last_updated_timestamp=datetime(2024, 7, 6, 13, 0)),
    Row(loan_id=3, customer_name="Alice Johnson", loan_amount=12000, loan_status="rejected", last_updated_timestamp=datetime(2024, 7, 3, 9, 30)),
    Row(loan_id=5, customer_name="Carlos Gomez", loan_amount=9000, loan_status="pending", last_updated_timestamp=datetime(2024, 7, 6, 15, 0)),
]

# Create DataFrame
source_df = spark.createDataFrame(source_data)

# Show for verification
source_df.show()

# Save as Delta table
source_df.write.format("delta").mode("overwrite").saveAsTable("default.loan_source_data")


In [0]:
select * from loan_source_data

In [0]:
select * from loan_data

Filter records from source table with last_updated_timestamp greater than max last_updated_timestamp

In [0]:
with source_data as(
  select * from loan_source_data
  where last_updated_timestamp > (
    select last_loaded_timestamp from control_table where table_name = 'loan_data'
  )
)
merge into loan_data as target
using source_data as source on target.loan_id = source.loan_id
when matched then 
  update set *
when not matched then 
  insert *

In [0]:
select * from loan_data order by 1