In [0]:
spark.sql("use globalretail_silver")
spark.sql("""
          create table if not exists silver_customers (
          customer_id string,
          name string,
          email string,
          country string,
          customer_type string,
          registration_date string,
          age int,
          gender string,
          total_purchases int,
          customer_segment string,
          days_since_registration int,
          last_updated timestamp
          )
          """)

In [0]:
#get the last processed timestamp from silver layer
last_processed_df = spark.sql("select max(last_updated) as last_processed from silver_customers")
last_processed_timestamp = last_processed_df.collect()[0]['last_processed']

if last_processed_timestamp is None:
    last_processed_timestamp = '1900-01-01T00:00:00.000+00:00'

In [0]:
#create a temporary view of incremental bronze data
spark.sql(f"""
          create or replace temporary view bronze_incremental as
          select *
          from globalretail_bronze.bronze_customer as c where c.ingestion_timestamp > '{last_processed_timestamp}'
          """)

In [0]:
spark.sql("select * from bronze_incremental").show()

In [0]:
#validate email address (null or not null)
#valid age between 18 to 100
#create customer_segment as total_purchases > 10000 then 'High Value' if total_purchases > 5000 then 'Medium Value' else 'Low Value'
#days since user is registered in the system
#remove any junk records where total_purchases is negative number

In [0]:
spark.sql("""
          create or replace temporary view silver_incremental as
          select
          customer_id,
          name,
          email,
          country,
          customer_type,
          registration_date,
          age,
          gender,
          total_purchases,
          case
          when total_purchases > 10000 then 'High Value'
          when total_purchases > 5000 then 'Medium Value'
          else 'Low Value'
          end as customer_segment,
          datediff(current_date(), registration_date) as days_since_registration,
          current_timestamp() as last_updated
          from bronze_incremental
          where age between 18 and 100
          and email is not null
          and total_purchases >= 0
          """)

In [0]:
display(spark.sql("select * from silver_incremental"))

In [0]:
spark.sql("""
          merge into silver_customers target
          using silver_incremental source
          on target.customer_id = source.customer_id
          when matched then update set *
          when not matched then insert *
          """)

In [0]:
spark.sql("select count(*) from silver_customers").show()