In [0]:
spark.sql("use md_globalretail")
spark.sql("""
          create table if not exists md_customers(
            customer_id string,
            name string,
            email string,
            country string,
            customer_type string,
            registration_date date,
            age int,
            gender string,
            total_purchases int,
            customer_segment string,
            days_since_registration int,
            last_updated timestamp
          )
          """)

Out[3]: DataFrame[]

In [0]:
last_processed_df=spark.sql("select max(last_updated) as last_processed from md_customers")
# last_processed_df.show()
last_processed_timestamp=last_processed_df.collect()[0]['last_processed']

if last_processed_timestamp is None:
  last_processed_timestamp='1900-01-01T00:00:00.000+00:00'

In [0]:
last_processed_timestamp

Out[18]: '1900-01-01T00:00:00.000+00:00'

In [0]:
# create a temporary view for incremental rw_customer
spark.sql(f"""
          create or replace temporary view rw_customer_incremental as
          select * from rw_globalretail.rw_customer c where c.ingestion_timestamp > '{last_processed_timestamp}'
          """)

Out[19]: DataFrame[]

In [0]:
%sql
SELECT * FROM rw_customer_incremental

customer_id,name,email,country,customer_type,registration_date,age,gender,total_purchases,ingestion_timestamp
1,Customer 1,customer1@example.com,Australia,Regular,2011-05-15,22,Male,191,2024-11-10T22:06:38.343+0000
2,Customer 2,customer2@example.com,France,Premium,2018-11-27,52,Other,145,2024-11-10T22:06:38.343+0000
3,Customer 3,customer3@example.com,Canada,Premium,2015-10-01,32,Other,691,2024-11-10T22:06:38.343+0000
4,Customer 4,customer4@example.com,USA,Premium,2011-01-19,70,Other,644,2024-11-10T22:06:38.343+0000
5,Customer 5,customer5@example.com,Germany,Regular,2021-08-26,66,Other,508,2024-11-10T22:06:38.343+0000
6,Customer 6,customer6@example.com,France,Premium,2015-03-02,20,Male,704,2024-11-10T22:06:38.343+0000
7,Customer 7,customer7@example.com,China,Premium,2018-05-24,24,Female,892,2024-11-10T22:06:38.343+0000
8,Customer 8,customer8@example.com,China,Regular,2023-10-02,26,Male,488,2024-11-10T22:06:38.343+0000
9,Customer 9,customer9@example.com,Japan,Premium,2014-10-05,36,Other,30,2024-11-10T22:06:38.343+0000
10,Customer 10,customer10@example.com,Brazil,Premium,2017-08-30,30,Male,959,2024-11-10T22:06:38.343+0000


In [0]:
'''
1. Business rules
2. calidate email address (null or not null)
3. validate the age between 18 to 100
4. create customer_segment as total_purchase> 10000 then "high value" if >500 "Medium value" else 'Low Value'
5. days since user is registered in the system
6. Remove any junk records where total_purchase is negative
'''
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW md_customer_incremental AS
SELECT
    customer_id,
    name,
    email,
    country,
    customer_type,
    registration_date,
    age,
    gender,
    total_purchases,
    CASE
        WHEN total_purchases > 10000 THEN 'High Value'
        WHEN total_purchases > 5000 THEN 'Medium Value'
        ELSE 'Low Value'
    END AS customer_segment,
    DATEDIFF(CURRENT_DATE(), registration_date) AS days_since_registration,
    CURRENT_TIMESTAMP() AS last_updated
FROM rw_customer_incremental
WHERE 
    age BETWEEN 18 AND 100
    AND email IS NOT NULL
    AND total_purchases >= 0
""")

Out[21]: DataFrame[]

In [0]:
%sql
select * from md_customer_incremental

customer_id,name,email,country,customer_type,registration_date,age,gender,total_purchases,customer_segment,days_since_registration,last_updated
1,Customer 1,customer1@example.com,Australia,Regular,2011-05-15,22,Male,191,Low Value,4928,2024-11-10T22:22:47.378+0000
2,Customer 2,customer2@example.com,France,Premium,2018-11-27,52,Other,145,Low Value,2175,2024-11-10T22:22:47.378+0000
3,Customer 3,customer3@example.com,Canada,Premium,2015-10-01,32,Other,691,Low Value,3328,2024-11-10T22:22:47.378+0000
4,Customer 4,customer4@example.com,USA,Premium,2011-01-19,70,Other,644,Low Value,5044,2024-11-10T22:22:47.378+0000
5,Customer 5,customer5@example.com,Germany,Regular,2021-08-26,66,Other,508,Low Value,1172,2024-11-10T22:22:47.378+0000
6,Customer 6,customer6@example.com,France,Premium,2015-03-02,20,Male,704,Low Value,3541,2024-11-10T22:22:47.378+0000
7,Customer 7,customer7@example.com,China,Premium,2018-05-24,24,Female,892,Low Value,2362,2024-11-10T22:22:47.378+0000
8,Customer 8,customer8@example.com,China,Regular,2023-10-02,26,Male,488,Low Value,405,2024-11-10T22:22:47.378+0000
9,Customer 9,customer9@example.com,Japan,Premium,2014-10-05,36,Other,30,Low Value,3689,2024-11-10T22:22:47.378+0000
10,Customer 10,customer10@example.com,Brazil,Premium,2017-08-30,30,Male,959,Low Value,2629,2024-11-10T22:22:47.378+0000


In [0]:
spark.sql("""
MERGE INTO md_customers target
USING md_customer_incremental source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN
    UPDATE SET *
WHEN NOT MATCHED THEN
    INSERT *
""")


Out[23]: DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql
select * from md_customers

customer_id,name,email,country,customer_type,registration_date,age,gender,total_purchases,customer_segment,days_since_registration,last_updated
1,Customer 1,customer1@example.com,Australia,Regular,2011-05-15,22,Male,191,Low Value,4928,2024-11-10T22:27:37.050+0000
2,Customer 2,customer2@example.com,France,Premium,2018-11-27,52,Other,145,Low Value,2175,2024-11-10T22:27:37.050+0000
3,Customer 3,customer3@example.com,Canada,Premium,2015-10-01,32,Other,691,Low Value,3328,2024-11-10T22:27:37.050+0000
4,Customer 4,customer4@example.com,USA,Premium,2011-01-19,70,Other,644,Low Value,5044,2024-11-10T22:27:37.050+0000
5,Customer 5,customer5@example.com,Germany,Regular,2021-08-26,66,Other,508,Low Value,1172,2024-11-10T22:27:37.050+0000
6,Customer 6,customer6@example.com,France,Premium,2015-03-02,20,Male,704,Low Value,3541,2024-11-10T22:27:37.050+0000
7,Customer 7,customer7@example.com,China,Premium,2018-05-24,24,Female,892,Low Value,2362,2024-11-10T22:27:37.050+0000
8,Customer 8,customer8@example.com,China,Regular,2023-10-02,26,Male,488,Low Value,405,2024-11-10T22:27:37.050+0000
9,Customer 9,customer9@example.com,Japan,Premium,2014-10-05,36,Other,30,Low Value,3689,2024-11-10T22:27:37.050+0000
10,Customer 10,customer10@example.com,Brazil,Premium,2017-08-30,30,Male,959,Low Value,2629,2024-11-10T22:27:37.050+0000
