In [0]:
spark.sql("USE globalretail_silver")
spark.sql("""
    CREATE TABLE IF NOT EXISTS silver_customers (
    customer_id STRING,
    name STRING,
    email STRING,
    country STRING,
    customer_type STRING,
    registration_date DATE,
    age INT,
    gender STRING,
    total_purchases INT,
    customer_segment STRING,
    days_since_registration INT,
    last_updated TIMESTAMP)
""")

DataFrame[]

In [0]:
# Get the last processed timestamp from silver layer
last_processed_df = spark.sql("SELECT MAX(last_updated) as last_processed FROM silver_customers")
last_processed_timestamp = last_processed_df.collect()[0]['last_processed']

if last_processed_timestamp is None:
    last_processed_timestamp = "1900-01-01T00:00:00.000+00:00"

In [0]:
# Create a temporary view of incremental bronze data
spark.sql(f"""
CREATE OR REPLACE TEMPORARY VIEW bronze_incremental AS
SELECT *
FROM globalretail_bronze.bronze_customer c where  c.ingestion_timestamp > '{last_processed_timestamp}'
""")

DataFrame[]

In [0]:
spark.sql("select * from bronze_incremental").show()

+-----------+-----------+--------------------+---------+-------------+-----------------+---+------+---------------+--------------------+
|customer_id|       name|               email|  country|customer_type|registration_date|age|gender|total_purchases| ingestion_timestamp|
+-----------+-----------+--------------------+---------+-------------+-----------------+---+------+---------------+--------------------+
|          1| Customer 1|customer1@example...|Australia|      Regular|       2011-05-15| 22|  Male|            191|2024-07-19 04:05:...|
|          2| Customer 2|customer2@example...|   France|      Premium|       2018-11-27| 52| Other|            145|2024-07-19 04:05:...|
|          3| Customer 3|customer3@example...|   Canada|      Premium|       2015-10-01| 32| Other|            691|2024-07-19 04:05:...|
|          4| Customer 4|customer4@example...|      USA|      Premium|       2011-01-19| 70| Other|            644|2024-07-19 04:05:...|
|          5| Customer 5|customer5@exampl

In [0]:
#Validate email addresses (null or not null)
#Valid age between 18 to 100
#Create customer_segment as total_purchases > 10000 THEN 'High Value' if > 5000 THEN 'Medium Value'  ELSE 'Low Value'
#days since user is registered in the system
#Remove any junk records where total_purchase is negative number


In [0]:
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW silver_incremental AS
SELECT
    customer_id,
    name,
    email,
    country,
    customer_type,
    registration_date,
    age,
    gender,
    total_purchases,
    CASE
        WHEN total_purchases > 10000 THEN 'High Value'
        WHEN total_purchases > 5000 THEN 'Medium Value'
        ELSE 'Low Value'
    END AS customer_segment,
    DATEDIFF(CURRENT_DATE(), registration_date) AS days_since_registration,
    CURRENT_TIMESTAMP() AS last_updated
FROM bronze_incremental
WHERE 
    age BETWEEN 18 AND 100
    AND email IS NOT NULL
    AND total_purchases >= 0
""")

DataFrame[]

In [0]:
display(spark.sql("select * from silver_incremental"))

customer_id,name,email,country,customer_type,registration_date,age,gender,total_purchases,customer_segment,days_since_registration,last_updated
1,Customer 1,customer1@example.com,Australia,Regular,2011-05-15,22,Male,191,Low Value,4814,2024-07-19T05:12:55.012Z
2,Customer 2,customer2@example.com,France,Premium,2018-11-27,52,Other,145,Low Value,2061,2024-07-19T05:12:55.012Z
3,Customer 3,customer3@example.com,Canada,Premium,2015-10-01,32,Other,691,Low Value,3214,2024-07-19T05:12:55.012Z
4,Customer 4,customer4@example.com,USA,Premium,2011-01-19,70,Other,644,Low Value,4930,2024-07-19T05:12:55.012Z
5,Customer 5,customer5@example.com,Germany,Regular,2021-08-26,66,Other,508,Low Value,1058,2024-07-19T05:12:55.012Z
6,Customer 6,customer6@example.com,France,Premium,2015-03-02,20,Male,704,Low Value,3427,2024-07-19T05:12:55.012Z
7,Customer 7,customer7@example.com,China,Premium,2018-05-24,24,Female,892,Low Value,2248,2024-07-19T05:12:55.012Z
8,Customer 8,customer8@example.com,China,Regular,2023-10-02,26,Male,488,Low Value,291,2024-07-19T05:12:55.012Z
9,Customer 9,customer9@example.com,Japan,Premium,2014-10-05,36,Other,30,Low Value,3575,2024-07-19T05:12:55.012Z
10,Customer 10,customer10@example.com,Brazil,Premium,2017-08-30,30,Male,959,Low Value,2515,2024-07-19T05:12:55.012Z


In [0]:

spark.sql("""
MERGE INTO silver_customers target
USING silver_incremental source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN
    UPDATE SET *
WHEN NOT MATCHED THEN
    INSERT *
""")


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql("select count(*) from silver_customers").show()

+--------+
|count(1)|
+--------+
|     930|
+--------+

