In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import types as T


In [2]:

spark = SparkSession.builder.master("local[2]").appName("Spark-Query").getOrCreate()

### Read Data

In [3]:
def read_data(spark, file_path):
    source_data = spark.read.format("csv").options(path=file_path,header=True,inferSchema = True,delimiter = ",").load()
    return source_data

### Write Data

In [4]:
def write_df(target_dataframe):
    if len(final_records.head(1)) > 0:
        target_file_path ="E:\\workforce\\spark\\data\\scd_type_data\\temp\\target_emp.csv"
        target_dataframe.coalesce(1).write.format("csv").\
        mode('overwrite').option("path",target_file_path).option('header',True).save()
        return 1
    else:
        return 0
    
def pd_write(target_dataframe):
    if len(final_records.head(1)) > 0:
        target_file_path ="E:\\workforce\\spark\\data\\scd_type_data\\temp\\target_emp.csv"
        pdframe = final_records.toPandas()
        pdframe.to_csv(target_file_path,index = False)
        return 1
    else:
        return 0
    

### Source Data

In [5]:
file_path ="E:\workforce\spark\data\scd_type_data\source_emp_day1.csv"
source_df = read_data(spark, file_path)
source_df.show()

+---+------+-------------------+-------+----------+
| id|  name|              email|country|created_at|
+---+------+-------------------+-------+----------+
|  1|  KHAN|  khan101@gmail.com|  INDIA|2024-01-05|
|  2|TAUKIR|taukir101@gmail.com| CANADA|2024-01-06|
+---+------+-------------------+-------+----------+



### Target Data

In [6]:
file_path ="E:\\workforce\\spark\\data\\scd_type_data\\target_emp_day1.csv"
target_df = read_data(spark, file_path)
target_df.show()

+---+----+-----+-------+----------+--------+---------+
| id|name|email|country|valid_from|valid_to|is_active|
+---+----+-----+-------+----------+--------+---------+
+---+----+-----+-------+----------+--------+---------+



In [7]:
def get_new_data(source_df,target_df):
    
    return source_df.join(target_df,source_df.id == target_df.id,'leftanti' ).\
    withColumn('valid_from',source_df.created_at).\
    withColumn('valid_to',lit('9999-01-01')).\
    withColumn('is_active',lit('Y')).\
    drop(source_df.created_at)
    
"""
The left anti join in PySpark is similar to the join functionality,
but it returns only columns from the left DataFrame for non-matched records.
"""
#target_Schema = 'id integer,name string,email string,country string, valid_from date,valid_to date, is_active string'
def deactivate_records(source_df,target_df):
    
    deactivate_rec =  source_df.join(target_df,source_df.id == target_df.id).\
            filter((target_df.is_active == 'Y') & 
                   (concat(source_df.name,source_df.email,source_df.country) != 
                   concat(target_df.name,target_df.email,target_df.country))).\
                        withColumn('is_active',lit('N')).\
                        withColumn('valid_from',source_df.created_at).\
                        withColumn('valid_to',current_date()).\
                select(target_df.id,target_df.name,target_df.email,
                       target_df.country,col('valid_from'),col('valid_to'),col('is_active'))
   

    deactivate_join_cond = [target_df.id == deactivate_rec.id,target_df.is_active == 'Y',target_df.valid_to == '9999-01-01']
    filter_target = target_df.alias('target').join(deactivate_rec.alias('deactivate') ,how='leftanti',on=deactivate_join_cond).\
                     select(col("target.id"),col("target.name"),col("target.email"),
                            col("target.country"),col("target.valid_from"),
                            col("target.valid_to"),col("target.is_active"))
                        
    
    updated_recs = filter_target.union(deactivate_rec)
    
    join_cond = [source_df.id == target_df.id,target_df.is_active == 'Y',
                 concat(source_df.name,source_df.email,source_df.country) != 
                   concat(target_df.name,target_df.email,target_df.country)]
    
    non_updated_recs =  source_df.join(target_df,how='inner',on=join_cond).\
                        withColumn('is_active',lit('Y')).\
                        withColumn('valid_from',source_df.created_at).\
                        withColumn('valid_to',lit('9999-01-01')).\
                select(source_df.id,source_df.name,source_df.email,source_df.country,
                       col('valid_from'),col('valid_to'),col('is_active'))
    return updated_recs.union(non_updated_recs)
   


def get_updated_data(source_df,target_df):
    
    return source_df.join(target_df,source_df.id == target_df.id).\
            filter((target_df.is_active == 'Y') & 
                   (concat(source_df.name,source_df.email,source_df.country) != 
                   concat(target_df.name,target_df.email,target_df.country))).\
                        withColumn('is_active',lit('Y')).\
                        withColumn('valid_from',source_df.created_at).\
                        withColumn('valid_to',lit('9999-01-01')).\
                        select(source_df.id,source_df.name,source_df.email,
                       source_df.country,col('valid_from'),col('valid_to'),col('is_active'))


def process_scd_type2_data(source_df,target_df):
    
    new_records = get_new_data(source_df,target_df)
    existing_records = deactivate_records(source_df,target_df)
    updated_ecords = get_updated_data(source_df,target_df)
    final_records = existing_records.union(new_records).union(updated_ecords)
    return final_records



## Day-1 Suurce data processed

In [8]:
final_records = process_scd_type2_data(source_df,target_df) 
pd_write(final_records)
final_records.show()

+---+------+-------------------+-------+----------+----------+---------+
| id|  name|              email|country|valid_from|  valid_to|is_active|
+---+------+-------------------+-------+----------+----------+---------+
|  1|  KHAN|  khan101@gmail.com|  INDIA|2024-01-05|9999-01-01|        Y|
|  2|TAUKIR|taukir101@gmail.com| CANADA|2024-01-06|9999-01-01|        Y|
+---+------+-------------------+-------+----------+----------+---------+



## Day-2 Suurce data processed

In [9]:
file_path ="E:\workforce\spark\data\scd_type_data\source_emp_day2.csv"
source_df = read_data(spark, file_path)
source_df.show()

+---+------+-------------------+-------+----------+
| id|  name|              email|country|created_at|
+---+------+-------------------+-------+----------+
|  1|  KHAN|  khan101@gmail.com|  INDIA|2024-01-07|
|  2|TAUKIR|taukir101@gmail.com| CANADA|2024-01-07|
+---+------+-------------------+-------+----------+



In [10]:
target_file_path ="E:\\workforce\\spark\\data\\scd_type_data\\temp\\target_emp.csv"
target_df = read_data(spark, target_file_path)
target_df.show()

+---+------+-------------------+-------+----------+----------+---------+
| id|  name|              email|country|valid_from|  valid_to|is_active|
+---+------+-------------------+-------+----------+----------+---------+
|  1|  KHAN|  khan101@gmail.com|  INDIA|2024-01-05|9999-01-01|        Y|
|  2|TAUKIR|taukir101@gmail.com| CANADA|2024-01-06|9999-01-01|        Y|
+---+------+-------------------+-------+----------+----------+---------+



In [11]:
final_records = process_scd_type2_data(source_df,target_df) 
pd_write(final_records)
final_records.show()

+---+------+-------------------+-------+----------+----------+---------+
| id|  name|              email|country|valid_from|  valid_to|is_active|
+---+------+-------------------+-------+----------+----------+---------+
|  1|  KHAN|  khan101@gmail.com|  INDIA|2024-01-05|9999-01-01|        Y|
|  2|TAUKIR|taukir101@gmail.com| CANADA|2024-01-06|9999-01-01|        Y|
+---+------+-------------------+-------+----------+----------+---------+



## Day-3 Suurce data processed

In [12]:
file_path ="E:\workforce\spark\data\scd_type_data\source_emp_day3.csv"
source_df = read_data(spark, file_path)
source_df.show()

+---+------+-------------------+---------+----------+
| id|  name|              email|  country|created_at|
+---+------+-------------------+---------+----------+
|  1|  KHAN|  khan101@gmail.com|AUSTRALIA|2024-01-08|
|  2|TAUKIR|taukir101@gmail.com|   CANADA|2024-01-07|
+---+------+-------------------+---------+----------+



In [13]:
target_file_path ="E:\\workforce\\spark\\data\\scd_type_data\\temp\\target_emp.csv"
target_df = read_data(spark, target_file_path)
target_df.show()

+---+------+-------------------+-------+----------+----------+---------+
| id|  name|              email|country|valid_from|  valid_to|is_active|
+---+------+-------------------+-------+----------+----------+---------+
|  1|  KHAN|  khan101@gmail.com|  INDIA|2024-01-05|9999-01-01|        Y|
|  2|TAUKIR|taukir101@gmail.com| CANADA|2024-01-06|9999-01-01|        Y|
+---+------+-------------------+-------+----------+----------+---------+



In [14]:
final_records = process_scd_type2_data(source_df,target_df) 
final_records.show()

+---+------+-------------------+---------+----------+----------+---------+
| id|  name|              email|  country|valid_from|  valid_to|is_active|
+---+------+-------------------+---------+----------+----------+---------+
|  2|TAUKIR|taukir101@gmail.com|   CANADA|2024-01-06|9999-01-01|        Y|
|  1|  KHAN|  khan101@gmail.com|    INDIA|2024-01-08|2024-04-07|        N|
|  1|  KHAN|  khan101@gmail.com|AUSTRALIA|2024-01-08|9999-01-01|        Y|
|  1|  KHAN|  khan101@gmail.com|AUSTRALIA|2024-01-08|9999-01-01|        Y|
+---+------+-------------------+---------+----------+----------+---------+



In [15]:
pd_write(final_records)
final_records.show()

+---+------+-------------------+---------+----------+----------+---------+
| id|  name|              email|  country|valid_from|  valid_to|is_active|
+---+------+-------------------+---------+----------+----------+---------+
|  2|TAUKIR|taukir101@gmail.com|   CANADA|2024-01-06|9999-01-01|        Y|
|  1|  KHAN|  khan101@gmail.com|    INDIA|2024-01-08|2024-04-07|        N|
|  1|  KHAN|  khan101@gmail.com|AUSTRALIA|2024-01-08|9999-01-01|        Y|
+---+------+-------------------+---------+----------+----------+---------+



## Day-4 Suurce data processed

In [16]:
file_path ="E:\workforce\spark\data\scd_type_data\source_emp_day4.csv"
source_df = read_data(spark, file_path)
source_df.show()

+---+------+-------------------+---------+----------+
| id|  name|              email|  country|created_at|
+---+------+-------------------+---------+----------+
|  1|  KHAN|  khan101@gmail.com|AUSTRALIA|2024-01-08|
|  2|TAUKIR|taukir101@gmail.com|   CANADA|2024-01-07|
|  3|   SAM|      sam@gmail.com|   CANADA|2024-01-15|
+---+------+-------------------+---------+----------+



In [17]:
target_file_path ="E:\\workforce\\spark\\data\\scd_type_data\\temp\\target_emp.csv"
target_df = read_data(spark, target_file_path)
target_df.show()

+---+------+-------------------+---------+----------+----------+---------+
| id|  name|              email|  country|valid_from|  valid_to|is_active|
+---+------+-------------------+---------+----------+----------+---------+
|  2|TAUKIR|taukir101@gmail.com|   CANADA|2024-01-06|9999-01-01|        Y|
|  1|  KHAN|  khan101@gmail.com|    INDIA|2024-01-08|2024-04-07|        N|
|  1|  KHAN|  khan101@gmail.com|AUSTRALIA|2024-01-08|9999-01-01|        Y|
|  1|  KHAN|  khan101@gmail.com|AUSTRALIA|2024-01-08|9999-01-01|        Y|
+---+------+-------------------+---------+----------+----------+---------+



In [18]:
final_records = process_scd_type2_data(source_df,target_df) 
final_records.show()

+---+------+-------------------+---------+----------+----------+---------+
| id|  name|              email|  country|valid_from|  valid_to|is_active|
+---+------+-------------------+---------+----------+----------+---------+
|  2|TAUKIR|taukir101@gmail.com|   CANADA|2024-01-06|9999-01-01|        Y|
|  1|  KHAN|  khan101@gmail.com|    INDIA|2024-01-08|2024-04-07|        N|
|  1|  KHAN|  khan101@gmail.com|AUSTRALIA|2024-01-08|9999-01-01|        Y|
|  1|  KHAN|  khan101@gmail.com|AUSTRALIA|2024-01-08|9999-01-01|        Y|
|  3|   SAM|      sam@gmail.com|   CANADA|2024-01-15|9999-01-01|        Y|
+---+------+-------------------+---------+----------+----------+---------+



In [19]:
pd_write(final_records)
final_records.show()

+---+------+-------------------+---------+----------+----------+---------+
| id|  name|              email|  country|valid_from|  valid_to|is_active|
+---+------+-------------------+---------+----------+----------+---------+
|  2|TAUKIR|taukir101@gmail.com|   CANADA|2024-01-06|9999-01-01|        Y|
|  1|  KHAN|  khan101@gmail.com|    INDIA|2024-01-08|2024-04-07|        N|
|  1|  KHAN|  khan101@gmail.com|AUSTRALIA|2024-01-08|9999-01-01|        Y|
|  1|  KHAN|  khan101@gmail.com|AUSTRALIA|2024-01-08|9999-01-01|        Y|
|  3|   SAM|      sam@gmail.com|   CANADA|2024-01-15|9999-01-01|        Y|
+---+------+-------------------+---------+----------+----------+---------+



```

+---+------+-------------------+---------+----------+----------+---------+
| id|  name|              email|  country|valid_from|  valid_to|is_active|
+---+------+-------------------+---------+----------+----------+---------+
|  2|TAUKIR|taukir101@gmail.com|   CANADA|2024-01-06|9999-01-01|        Y|
|  1|  KHAN|  khan101@gmail.com|    INDIA|2024-01-08|2024-04-07|        N|
|  1|  KHAN|  khan101@gmail.com|AUSTRALIA|2024-01-08|9999-01-01|        Y|
|  3|   SAM|      sam@gmail.com|   CANADA|2024-01-15|9999-01-01|        Y|
+---+------+-------------------+---------+----------+----------+---------+
```