#### SCHEMA VALIDATION WITH MERGE STATEMENT

In [0]:
%sql
create database my_db;

In [0]:
%sql
use my_db;

In [0]:
%sql

CREATE TABLE IF NOT EXISTS customer_target (
    cust_id INT,
    fname STRING,
    lname STRING,
    email STRING,
    contact_number STRING,
    city_name STRING,
    state_code STRING,
    signup_date DATE
)




In [0]:
%sql
INSERT INTO customer_target VALUES
    (1, 'John', 'Doe', 'john.doe@email.com', '555-0101', 'Seattle', 'WA', '2023-01-15'),
    (2, 'Jane', 'Smith', 'jane.updated@email.com', '555-0202', 'Portland', 'OR', '2023-02-20'),
    (4, 'Alice', 'Williams', 'alice.w@email.com', '555-0104', 'San Diego', 'CA', '2023-04-05'),
    (5, 'Carol', 'Brown', 'carol.b@email.com', '555-0105', 'Denver', 'CO', '2023-05-01');

num_affected_rows,num_inserted_rows
4,4


In [0]:
%sql
select * from customer_target

cust_id,fname,lname,email,contact_number,city_name,state_code,signup_date
1,John,Doe,john.doe@email.com,555-0101,Seattle,WA,2023-01-15
2,Jane,Smith,jane.updated@email.com,555-0202,Portland,OR,2023-02-20
4,Alice,Williams,alice.w@email.com,555-0104,San Diego,CA,2023-04-05
5,Carol,Brown,carol.b@email.com,555-0105,Denver,CO,2023-05-01


In [0]:
%sql
-- Create source table (current customer data) with extra loyalty_points column
CREATE TABLE IF NOT EXISTS customer_source (
    customer_id INT,
    first_name STRING,
    last_name STRING,
    email_address STRING,
    phone_number STRING,
    city STRING,
    state STRING,
    registration_date DATE,
    loyalty_points INT  
);

INSERT INTO customer_source VALUES
    (1, 'John', 'Doe', 'john.doe@email.com', '555-0101', 'Seattle', 'WA', '2023-01-15', 500),
    (2, 'Jane', 'Smith', 'jane.smith@email.com', '555-0102', 'Portland', 'OR', '2023-02-20', 750),
    (3, 'Bob', 'Johnson', 'bob.j@email.com', '555-0103', 'San Francisco', 'CA', '2023-03-10',  100),
    (4, 'Alice', 'Williams', 'alice.w@email.com', '555-0104', 'Los Angeles', 'CA', '2023-04-05', 300);

num_affected_rows,num_inserted_rows
4,4


In [0]:
%sql
select * from customer_source

customer_id,first_name,last_name,email_address,phone_number,city,state,registration_date,loyalty_points
1,John,Doe,john.doe@email.com,555-0101,Seattle,WA,2023-01-15,500
2,Jane,Smith,jane.smith@email.com,555-0102,Portland,OR,2023-02-20,750
3,Bob,Johnson,bob.j@email.com,555-0103,San Francisco,CA,2023-03-10,100
4,Alice,Williams,alice.w@email.com,555-0104,Los Angeles,CA,2023-04-05,300


#### Merging the target table with source table
* Case 1 : Insert all with different column names
* Case 2 : Update all with different column names
* Case 3 : Inserting more columns from source table

In [0]:
%sql
MERGE INTO customer_target tgt 
USING customer_source src 
on tgt.cust_id = src.customer_id
WHEN NOT MATCHED THEN
INSERT *

In [0]:
# ERROR : AnalysisException: cannot resolve cust_id in UPDATE clause given columns src.customer_id, src.first_name, src.last_name, src.email_address, src.phone_number, src.city, src.state, src.registration_date, src.loyalty_points; line 1 pos 0



#### KEY : When we are performing the update All and Insert all operation the column names from both source and target should match.

* Modified approach

In [0]:
df = (
    spark.read.table('my_db.customer_source')
)

In [0]:
df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email_address: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- registration_date: date (nullable = true)
 |-- loyalty_points: integer (nullable = true)



In [0]:
df_new = (
    df.select(df.customer_id.alias('cust_id'),
              df.first_name.alias('fname'),
              df.last_name.alias('lname'),
              df.email_address.alias('email'),
              df.phone_number.alias('contact_number'),
              df.city.alias('city_name'),
              df.state.alias('state_code'),
              df.registration_date.alias('signup_date'),
              df.loyalty_points
              )
)

In [0]:
df_new.show(19,0)

+-------+-----+--------+--------------------+--------------+-------------+----------+-----------+--------------+
|cust_id|fname|lname   |email               |contact_number|city_name    |state_code|signup_date|loyalty_points|
+-------+-----+--------+--------------------+--------------+-------------+----------+-----------+--------------+
|1      |John |Doe     |john.doe@email.com  |555-0101      |Seattle      |WA        |2023-01-15 |500           |
|2      |Jane |Smith   |jane.smith@email.com|555-0102      |Portland     |OR        |2023-02-20 |750           |
|3      |Bob  |Johnson |bob.j@email.com     |555-0103      |San Francisco|CA        |2023-03-10 |100           |
|4      |Alice|Williams|alice.w@email.com   |555-0104      |Los Angeles  |CA        |2023-04-05 |300           |
+-------+-----+--------+--------------------+--------------+-------------+----------+-----------+--------------+



In [0]:
df_new.createOrReplaceTempView('customer_src')

In [0]:
%sql
MERGE INTO customer_target as tgt 
USING customer_src as src 
on tgt.cust_id = src.cust_id
WHEN NOT MATCHED THEN
  INSERT *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1,0,0,1


In [0]:
%sql
select * from customer_target

cust_id,fname,lname,email,contact_number,city_name,state_code,signup_date
1,John,Doe,john.doe@email.com,555-0101,Seattle,WA,2023-01-15
2,Jane,Smith,jane.updated@email.com,555-0202,Portland,OR,2023-02-20
4,Alice,Williams,alice.w@email.com,555-0104,San Diego,CA,2023-04-05
5,Carol,Brown,carol.b@email.com,555-0105,Denver,CO,2023-05-01
3,Bob,Johnson,bob.j@email.com,555-0103,San Francisco,CA,2023-03-10


In [0]:
%sql
MERGE INTO customer_target as tgt 
USING customer_src as src 
on tgt.cust_id = src.cust_id
WHEN MATCHED THEN
  UPDATE SETss *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
4,4,0,0


In [0]:
%sql
select * from customer_target

cust_id,fname,lname,email,contact_number,city_name,state_code,signup_date
1,John,Doe,john.doe@email.com,555-0101,Seattle,WA,2023-01-15
2,Jane,Smith,jane.smith@email.com,555-0102,Portland,OR,2023-02-20
3,Bob,Johnson,bob.j@email.com,555-0103,San Francisco,CA,2023-03-10
4,Alice,Williams,alice.w@email.com,555-0104,Los Angeles,CA,2023-04-05
5,Carol,Brown,carol.b@email.com,555-0105,Denver,CO,2023-05-01


#### KEY : 
* If we are performing the Update All or Insert All,
if the source table have more columns which are not part of target table then,
Merge statement will silently drop those columns while merging.

#### SCHEMA EVOLUTION
1. Sometimes the data will evolve with more columns to load the data into target.
2. In this case , we have 2 approaches : 
    * a) Manual Schema change (Alter table statement) **Recommendable**
    * b) Automatic Schema update (setting some config)

**Config : SET spark.databricks.delta.schema.autoMerge.enabled = True**
* If we set this config , then it will applicable for all the commands in the notebook.

#### MergeSchema Option

** The mergeSchema option in Delta Lake allows you to automatically evolve the schema of a Delta table during a write operation. This means if the source data contains additional columns that are not present in the target table, the target table's schema will be updated to include those new columns. 

In [0]:
%sql

CREATE TABLE target_table (
    id INT,
    name STRING,
    age INT
)
USING DELTA;


In [0]:
%sql

INSERT INTO target_table VALUES
    (1, 'John', 25),
    (2, 'Alice', 30);


num_affected_rows,num_inserted_rows
2,2


In [0]:
%sql
select * from target_table

id,name,age
1,John,25
2,Alice,30


In [0]:
%sql

CREATE TABLE source_table (
    id INT,
    name STRING,
    age INT,
    city STRING,
    country STRING
)
USING DELTA;


INSERT INTO source_table VALUES
    (1, 'John', 26, 'San Francisco', 'USA'), 
    (3, 'Bob', 28, 'Chicago', 'USA');         


num_affected_rows,num_inserted_rows
2,2


In [0]:
%sql
select * from source_table

id,name,age,city,country
1,John,26,San Francisco,USA
3,Bob,28,Chicago,USA


* Source table table have 5 columns, Target columns have 3 columns.
* If we want to merge the schema of source table with target table then we can use **MergeSchema option**.

In [0]:
df = (
    spark.table('my_db.source_table')
)

In [0]:
(
    df.write
    .format('delta')
    .mode('append')
    .option('mergeSchema',True)
    .saveAsTable('my_db.target_table')
)

In [0]:
%sql
select * from my_db.target_table

id,name,age,city,country
1,John,26,San Francisco,USA
3,Bob,28,Chicago,USA
1,John,25,,
2,Alice,30,,


#### OverWriteSchema Option

* Sometimes we want to change the data type of existing column to new data type.
* In this case , we can able to use the **overwriteSchema option**.

In [0]:
%sql
drop table source_table;
drop table target_table;

In [0]:
%sql
CREATE TABLE target_table (
    id INT,
    name STRING,
    age INT
)
USING DELTA;

INSERT INTO target_table VALUES
    (1, 'John', 25),
    (2, 'Alice', 30);




num_affected_rows,num_inserted_rows
2,2


In [0]:
%sql
select * from target_table

id,name,age
1,John,25
2,Alice,30


In [0]:
df = (
    spark.table('my_db.target_table')
)

In [0]:
(
    df.select(df.id,df.name,df.age.cast('float').alias('age'))
    .write 
    .format('delta')
    .mode('overwrite')
    .option('overwriteSchema',True)
    .saveAsTable('my_db.target_table')
)

In [0]:
%sql
describe my_db.target_table

col_name,data_type,comment
id,int,
name,string,
age,float,
