In [0]:
from pyspark.sql.functions import current_timestamp,when,col,coalesce,lit
from pyspark.sql.types import StructField,StructType,StringType,IntegerType,TimestampType
from delta.tables import *

In [0]:
dbutils.fs.rm("/FileStore/tables/scd2",True)

In [0]:
%sql
drop table if exists emp.scd2

In [0]:
%sql
create schema if not exists emp;
create table if not exists emp.scd2
(
  id int,
  name string,
  mobile string,
  version int,
  datetime timestamp,
  Status string
)using delta
location '/FileStore/tables/scd2';

insert into emp.scd2 values (1,'Vinoth Sakkaraivel','8884117632',1,current_timestamp(),'Active'),(2,'Sathya','8792810778',0,current_timestamp(),'Active');

In [0]:
%sql
select * from emp.scd2;

In [0]:
dt = DeltaTable.forName(spark,'emp.scd2')
df = dt.toDF()
df.display()

In [0]:
emp_data = [(1,'Vinoth S','+91 8884117632'),
            (3,'ThanuMithra','9999999989'),
            (4,'SirpigaVS','1111111'),
            (2,'Sathya A','+91 8792810778')]

emp_schema = StructType([StructField('id',IntegerType(),True),
                         StructField('name',StringType(),True),
                         StructField('mobile',StringType(),True)])
df1 = spark.createDataFrame(emp_data,emp_schema).withColumn('datetime',current_timestamp())
df1.display()

In [0]:
df2 = df.join(df1,df.id == df1.id,'outer').select(df['*'],df1.id.alias('nwid'),df1.name.alias('nwname'),df1.mobile.alias('nwmobile'))
df2.display()

In [0]:
df2 = df2.withColumn('Status',when(((df2.nwid.isNull()) & (df2.id.isNotNull())),'NA').when(((df2.nwid.isNotNull()) & (df2.id.isNull())),'New').otherwise('Update')).withColumn('Version',lit(0))
df2 = df2.filter(df2.Status != 'NA')
df2.display()

In [0]:
df_copy = df
df_copy.display()

In [0]:
df_new = df2.filter(df2.Status == 'New').select(df2.nwid,df2.nwname,df2.nwmobile).withColumns({'Version':lit(0),"datetime":current_timestamp(),"Status":lit("Active")})
df_new.display()
df_copy = df_copy.unionAll(df_new)
df_copy.display()


In [0]:
#display(df2)
df_up = df2.filter(df2.Status == 'Update').select(df2.nwid,df2.nwname,df2.nwmobile,df2.Version)
df_up.display()

In [0]:
#display(df_copy)
df3 = df_copy.join(df_up,df_copy.id == df_up.nwid,'left').withColumn('newVersion',when(df_up.nwid.isNotNull(),lit(1)).when(((df_up.nwid.isNull()) & (df_copy.id.isNotNull())),df_copy.version))
#df3.display()
df4 = df3.select(df3.id,df3.name,df3.mobile,df3.newVersion)
df4.display()

In [0]:
df = df4.unionAll(df_up).withColumns({'datetime':current_timestamp()})
df = df.withColumn('Status',when(df.newVersion == 0,'Active').otherwise('InActive'))
df.display()

In [0]:
df_dup = df.groupBy(["id", df.Status]).count().sort("id")
#df_dup.display()
df_temp = df_dup.filter(col('count') > 2)
df_temp.display()

In [0]:
# Delete rows from DeltaTable where id matches df_temp
dt.toDF().display()
dt.alias("dt").merge(
    df_temp.alias("ds"),
    "(dt.id = ds.id) and (dt.status = 'InActive')"
).whenMatchedDelete().execute()
dt.toDF().display()

In [0]:
df.createOrReplaceTempView('tmp_emp_scd2')

In [0]:
%sql
select * from tmp_emp_scd2;

In [0]:
%sql
select * from emp.scd2

In [0]:
%sql
MERGE INTO emp.scd2 AS t
USING tmp_emp_scd2 AS s
ON  t.id = s.id AND t.Status = s.Status
WHEN MATCHED THEN
  UPDATE SET
    t.id = s.id,
    t.name = s.name,
    t.mobile = s.mobile,
    t.version = s.newVersion,
    t.datetime = s.datetime,
    t.Status = s.Status
WHEN NOT MATCHED THEN
  INSERT (id, name, mobile,version, datetime, Status)
  VALUES (s.id, s.name, s.mobile,s.newVersion, s.datetime, s.Status)

In [0]:
%sql
select * from emp.scd2 order by 1;