# Incremental load

#### Create database on loaction

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS ssd_spark_db location '/dbfs/FileStore/SSD_01'

### Read data from file to DataFrame

In [0]:
read_df=spark.read.option("inferSchema" , True).option("header",True).csv("dbfs:/FileStore/SSD_01/EMPLOYEE")

####Display DataFrame content

In [0]:
display(read_df)

First_Name,Last_Name,emp_id,emp_dept,work_location,Email
Nicholas Treutel,Prof. Norwood Hickle,1,ENGNEER,KOL,king.micheal@gmail.com
Micaela Leannon,Mrs. Caleigh Breitenberg,2,ENGNEER,KOL,wlebsack@kohler.com
Miss Alysha Tillman MD,Lewis Stamm,3,ENGNEER,KOL,qwalter@hotmail.com
Prof. Pauline Paucek,Dr. Edna Emard I,4,ENGNEER,KOL,lehner.elnora@yahoo.com
Prof. Elena Heidenreich Jr.,Emmie Crona,5,ENGNEER,KOL,balistreri.stanton@friesen.com
Yasmeen Abbott,Wanda Prosacco,6,ENGNEER,KOL,moen.merl@keebler.com
Hoyt Zieme,Vena Schinner,7,ENGNEER,KOL,mante.jackeline@yahoo.com
Lloyd Murazik,Dr. Dasia Welch Sr.,8,ENGNEER,KOL,ioconnell@white.com
Brenda Friesen,Lucious Veum,9,ENGNEER,KOL,jsatterfield@gmail.com
Alta Hirthe,Susie Maggio,10,ENGNEER,KOL,parisian.jennie@yahoo.com


### Add audit_creation_datetime column into dataframe

In [0]:
from pyspark.sql.functions import current_timestamp

In [0]:
read_df_02=read_df.withColumn("audit_creation_datetime",current_timestamp())

In [0]:
read_df_02.show()

+--------------------+--------------------+------+--------+-------------+--------------------+-----------------------+
|          First_Name|           Last_Name|emp_id|emp_dept|work_location|               Email|audit_creation_datetime|
+--------------------+--------------------+------+--------+-------------+--------------------+-----------------------+
|    Nicholas Treutel|Prof. Norwood Hickle|     1| ENGNEER|          KOL|king.micheal@gmai...|   2023-03-09 15:58:...|
|     Micaela Leannon|Mrs. Caleigh Brei...|     2| ENGNEER|          KOL| wlebsack@kohler.com|   2023-03-09 15:58:...|
|Miss Alysha Tillm...|         Lewis Stamm|     3| ENGNEER|          KOL| qwalter@hotmail.com|   2023-03-09 15:58:...|
|Prof. Pauline Paucek|    Dr. Edna Emard I|     4| ENGNEER|          KOL|lehner.elnora@yah...|   2023-03-09 15:58:...|
|Prof. Elena Heide...|         Emmie Crona|     5| ENGNEER|          KOL|balistreri.stanto...|   2023-03-09 15:58:...|
|      Yasmeen Abbott|      Wanda Prosacco|     

### writing data into employee table

In [0]:
read_df_02.write.format("delta").mode("append").saveAsTable("ssd_spark_db.employee")

### Update data or Data Transformation if needed

In [0]:
%sql
update ssd_spark_db.employee set emp_dept='ENGINEER'
WHERE emp_dept='ENGNEER'

num_affected_rows
77


In [0]:
%sql
SELECT * from ssd_spark_db.employee;

First_Name,Last_Name,emp_id,emp_dept,work_location,Email,audit_creation_datetime
Nicholas Treutel,Prof. Norwood Hickle,1,ENGINEER,KOL,king.micheal@gmail.com,2023-03-09T15:58:49.533+0000
Micaela Leannon,Mrs. Caleigh Breitenberg,2,ENGINEER,KOL,wlebsack@kohler.com,2023-03-09T15:58:49.533+0000
Miss Alysha Tillman MD,Lewis Stamm,3,ENGINEER,KOL,qwalter@hotmail.com,2023-03-09T15:58:49.533+0000
Prof. Pauline Paucek,Dr. Edna Emard I,4,ENGINEER,KOL,lehner.elnora@yahoo.com,2023-03-09T15:58:49.533+0000
Prof. Elena Heidenreich Jr.,Emmie Crona,5,ENGINEER,KOL,balistreri.stanton@friesen.com,2023-03-09T15:58:49.533+0000
Yasmeen Abbott,Wanda Prosacco,6,ENGINEER,KOL,moen.merl@keebler.com,2023-03-09T15:58:49.533+0000
Hoyt Zieme,Vena Schinner,7,ENGINEER,KOL,mante.jackeline@yahoo.com,2023-03-09T15:58:49.533+0000
Lloyd Murazik,Dr. Dasia Welch Sr.,8,ENGINEER,KOL,ioconnell@white.com,2023-03-09T15:58:49.533+0000
Brenda Friesen,Lucious Veum,9,ENGINEER,KOL,jsatterfield@gmail.com,2023-03-09T15:58:49.533+0000
Alta Hirthe,Susie Maggio,10,ENGINEER,KOL,parisian.jennie@yahoo.com,2023-03-09T15:58:49.533+0000


### Same for second Table as well

In [0]:
read_df=spark.read.option("inferSchema" , True).option("header",True).csv("dbfs:/FileStore/SSD_01/DEPARTMENT")
from pyspark.sql.functions import current_timestamp
read_df_02=read_df.withColumn("audit_creation_datetime",current_timestamp())
read_df_02.write.format("delta").mode("append").saveAsTable("ssd_spark_db.department")

In [0]:
%sql
select * from ssd_spark_db.department;

emp_id,emp_rating,salary,phone_number,username,audit_creation_datetime
1,1,30,1-223-451-3212,ardella52,2023-03-09T15:59:22.095+0000
2,8,219930,-3133,fernando20,2023-03-09T15:59:22.095+0000
3,7,919937953,-7003,gerlach.adolphus,2023-03-09T15:59:22.095+0000
4,3,1739766,(707) 976-4067,swaniawski.sydnie,2023-03-09T15:59:22.095+0000
5,1,8,(480) 347-6134,nolan.roderick,2023-03-09T15:59:22.095+0000
6,6,35154298,+1.719.286.6459,maximilian.lesch,2023-03-09T15:59:22.095+0000
7,1,9,458.650.4840,rachael08,2023-03-09T15:59:22.095+0000
8,4,7179,267.696.4410,icremin,2023-03-09T15:59:22.095+0000
9,7,605867,+1.804.317.4665,wshields,2023-03-09T15:59:22.095+0000
10,7,8599003,(941) 690-2134,rosalyn30,2023-03-09T15:59:22.095+0000


In [0]:
%sql
describe table ssd_spark_db.department;

col_name,data_type,comment
emp_id,int,
emp_rating,int,
salary,int,
phone_number,string,
username,string,
audit_creation_datetime,timestamp,


In [0]:
%sql
CREATE TABLE IF NOT EXISTS ssd_spark_db.control_table_02 (
    Serial_id int ,
    Table_name varchar(255),
    Status varchar(255),
    Last_Update_date  string
)


## Insert one starting  data into control Table

In [0]:
#%sql
#insert into ssd_spark_db.control_table_02(Serial_id,Table_name,Status,Last_Update_date) values(1,"target_table_x","completed",current_timestamp)

In [0]:
%sql 
select * from ssd_spark_db.control_table_02;

Serial_id,Table_name,Status,Last_Update_date
1,target_table_x,completed,2023-03-06 10:23:17.254


In [0]:
%sql
INSERT INTO ssd_spark_db.control_table_02 SELECT * FROM (
with cte_01 as
(
SELECT MAX(Serial_id) as id_1 from ssd_spark_db.control_table_02
),
cte_02 as
(
select c.id_1+1 as Serial_id,
'target_table_x' as Table_name,
'InProgress' as Status,
current_timestamp as Last_Update_date
from cte_01 c
)
SELECT * FROM cte_02)

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
SELECT * FROM ssd_spark_db.control_table_02

Serial_id,Table_name,Status,Last_Update_date
2,target_table_x,InProgress,2023-03-09 15:59:30.535
1,target_table_x,completed,2023-03-06 10:23:17.254


##Increamtal data processing

In [0]:
%sql
CREATE TABLE IF NOT EXISTS ssd_spark_db.target_x (
    emp_id int ,
    FULL_NAME varchar(255),
    emp_dept varchar(255),
    work_location  varchar(255),
    username varchar(255),
    emp_rating  varchar(255),
    phone_number varchar(255),
    email varchar(255),
    salary int
)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS ssd_spark_db.target_x_dl (
    emp_id int ,
    FULL_NAME varchar(255),
    emp_dept varchar(255),
    work_location  varchar(255),
    username varchar(255),
    emp_rating  varchar(255),
    phone_number varchar(255),
    email varchar(255),
    salary int
)

In [0]:
%sql
INSERT INTO ssd_spark_db.target_x SELECT * FROM (
with cte_date as 
(
select max(Last_Update_date) as min_date 
from ssd_spark_db.control_table_02
WHERE Status='completed'
),
CTE_DATA as
(
select 
e1.emp_id AS emp_id,
(e1.First_Name || e1.Last_name) AS FULL_NAME,
e1.emp_dept AS emp_dept,
e1.work_location AS work_location,
d.username AS username,
d.emp_rating AS emp_rating,
d.phone_number AS phone_number,
e1.email AS email,
d.salary AS salary,
row_number () over( partition by e1.emp_id order by e1.audit_creation_datetime desc ) rank_01
from 
ssd_spark_db.employee e1
left outer join 
ssd_spark_db.department d
on 
e1.emp_id =d.emp_id
cross join cte_date c
where e1.audit_creation_datetime > c.min_date
--AND e1.audit_creation_datetime <= current_timestamp OPTIONAL IT WILL HELP WHEN WE WILL DO THE HISTORY LOAD BETWEEN 2 TIMESTAMP 
)
select 
emp_id ,
FULL_NAME ,
emp_dept ,
work_location , 
username ,
emp_rating,
phone_number,   
email,
salary 
from CTE_DATA
WHERE rank_01=1)

num_affected_rows,num_inserted_rows
500,500


In [0]:
%sql
SELECT * FROM ssd_spark_db.target_x

emp_id,FULL_NAME,emp_dept,work_location,username,emp_rating,phone_number,email,salary
1,Nicholas TreutelProf. Norwood Hickle,ENGINEER,KOL,ardella52,1,1-223-451-3212,king.micheal@gmail.com,30
2,Micaela LeannonMrs. Caleigh Breitenberg,ENGINEER,KOL,fernando20,8,-3133,wlebsack@kohler.com,219930
3,Miss Alysha Tillman MDLewis Stamm,ENGINEER,KOL,gerlach.adolphus,7,-7003,qwalter@hotmail.com,919937953
4,Prof. Pauline PaucekDr. Edna Emard I,ENGINEER,KOL,swaniawski.sydnie,3,(707) 976-4067,lehner.elnora@yahoo.com,1739766
5,Prof. Elena Heidenreich Jr.Emmie Crona,ENGINEER,KOL,nolan.roderick,1,(480) 347-6134,balistreri.stanton@friesen.com,8
6,Yasmeen AbbottWanda Prosacco,ENGINEER,KOL,maximilian.lesch,6,+1.719.286.6459,moen.merl@keebler.com,35154298
7,Hoyt ZiemeVena Schinner,ENGINEER,KOL,rachael08,1,458.650.4840,mante.jackeline@yahoo.com,9
8,Lloyd MurazikDr. Dasia Welch Sr.,ENGINEER,KOL,icremin,4,267.696.4410,ioconnell@white.com,7179
9,Brenda FriesenLucious Veum,ENGINEER,KOL,wshields,7,+1.804.317.4665,jsatterfield@gmail.com,605867
10,Alta HirtheSusie Maggio,ENGINEER,KOL,rosalyn30,7,(941) 690-2134,parisian.jennie@yahoo.com,8599003


##inserting remaining data into Target_dl table(Droped Data or Duplicate Data or Unsatified Data)

In [0]:
%sql
INSERT INTO ssd_spark_db.target_x_dl SELECT * FROM (
select 
emp_id ,
FULL_NAME ,
emp_dept ,
work_location , 
username ,
emp_rating,
phone_number,   
email,
salary 
from CTE_DATA
WHERE rank_01 !=1)

num_affected_rows,num_inserted_rows
0,0


In [0]:
%sql
SELECT * FROM ssd_spark_db.target_x_dl

emp_id,FULL_NAME,emp_dept,work_location,username,emp_rating,phone_number,email,salary


### Updting control tabble

In [0]:
%sql
update ssd_spark_db.control_table_02
set Status="completed", Last_Update_date =current_timestamp
where Serial_id IN (select MAX(Serial_id) from ssd_spark_db.control_table_02)

num_affected_rows
1


In [0]:
%sql
select * FROM ssd_spark_db.control_table_02

Serial_id,Table_name,Status,Last_Update_date
1,target_table_x,completed,2023-03-06 10:23:17.254
2,target_table_x,completed,2023-03-09 15:59:56.357
