In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_date, when
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()

source_data = [
    (101, 'Alice', 'Boston', 'MA'),
    (102, 'Bob', 'Aigaleo', 'IL'),
    (104, 'Charlie', 'Austin', 'TX')
]
source_schema = StructType([
    StructField('customer_id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('city', StringType(), True),
    StructField('state', StringType(), True),
])
source_df = spark.createDataFrame(source_data, schema=source_schema)

target_data = [
    (101, 'Alice', 'New York', 'NY', '2022-01-01', '2023-03-15', 'N'),
    (101, 'Alice', 'Boston', 'MA', '2023-03-16', None, 'Y'),
    (102, 'Bob', 'Chicago', 'IL', '2022-01-01', None, 'Y'),
    (103, 'Charlie', 'Austin', 'TX', '2022-01-01', None, 'Y'),
]
target_schema = StructType([
    StructField('customer_id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('city', StringType(), True),
    StructField('state', StringType(), True),
    StructField('start_date', StringType(), True),
    StructField('end_date', StringType(), True),
    StructField('is_current', StringType(), True),
])
target_df = spark.createDataFrame(target_data, schema=target_schema)

In [0]:
display(source_df)
display(target_df)

In [0]:
# Left join between Source and Target table on customer_id and is_current=Y (active records)
join_df = source_df.\
    join(target_df, on=(source_df.customer_id == target_df.customer_id) & (target_df.is_current == 'Y'), how='left')\
    .select(source_df["*"],\
            target_df.customer_id.alias('target_customer_id'),\
            target_df.name.alias('target_name'),\
            target_df.city.alias('target_city'),\
            target_df.state.alias('target_state'))
display(join_df)

In [0]:
# Filter the join df to seperate the records which are NEW or MODIFIED
from pyspark.sql.functions import xxhash64

filter_df = join_df.filter(xxhash64(join_df.name, join_df.city, join_df.state) != xxhash64(join_df.target_name, join_df.target_city, join_df.target_state))

display(filter_df)

In [0]:
# Create a new MERGED_KEY on the above df
from pyspark.sql.functions import concat

# if our table has more PK's then they should be consider into the concat
merged_df = filter_df.withColumn('MERGEDKEY', concat(filter_df.customer_id))
display(merged_df)

In [0]:
# Create a new df with null MERGED_KEY on the MODIFIED rows
dummy_df = filter_df.filter("target_customer_id is not null").withColumn('MERGEDKEY', lit(None))
display(dummy_df)

In [0]:
# Union Between the above two df's
# We have produced a df which has one record for the new INSERT rows and two records for the MODIFIED rows
union_df = merged_df.union(dummy_df)
display(union_df)

In [0]:
target_df.merge(
    source = union_df,
    condition = "target_df.customer_id == union_df.customer_id and target_df.is_current == 'Y'", # it filters the active records
).whenMatchedUpdate(set=
    {
        "is_current" : "'N'",
        "end_date" : "current_date"
    }
).whenNotMatchedInsert(values=
    {
        "customer_id": "source.customer_id",
        "name": "source.name",
        "city": "source.city",
        "state": "source.state",
        "start_date": "current_date",
        "end_date": "null",
        "is_current": "'Y'"
    }
).execute()