In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp
import pyspark.sql.functions as F

In [6]:
# Create a Spark session
spark = SparkSession.builder.appName("CustomerHistorization").getOrCreate()

# Simulate an initial load of customer data
initial_data = [
    (1, 'Alice', 'alice@example.com'),
    (2, 'Bob', 'bob@example.com'),
    (3, 'Charlie', 'charlie@example.com'),
    (6, 'Frank', 'frank@example.com'),
]

columns = ["CustomerID", "Name", "Email"]

df = spark.createDataFrame(initial_data, columns)


# Simulate changes in the data (updates)
# TODO: Inner join between df and incoming_df
changes_data = [
    (1, 'Alice', 'alice.new@example.com'),
    (3, 'Chandler','charlie@example.com'),
]

changes_df = spark.createDataFrame(changes_data, columns)

# Load incremental data (e.g., new customers)
# TODO: remaining rows from incoming df
incremental_data = [(4, 'David', 'david@example.com'),
                    (5, 'Eva', 'eva@example.com')]

incremental_df = spark.createDataFrame(incremental_data, columns)


# Simulate deletions
deletions = [
    2,
]

# Create the update df

In [7]:
# Create a DataFrame for the update record

# Record deletion operations
update = (
    df
    .filter(F.col("CustomerID").isin(deletions))
    .withColumn("ChangeType", lit("Deletion"))
)

# Record modification operations
update = (
    update
    .unionByName(
        df
        .join(changes_df, ["CustomerID"], "leftsemi")
        .withColumn("ChangeType", lit("Modification"))   
    )
)

# Record addition operations (new row)
# All columns are not needed just the keys
copy_incremental_df = incremental_df

null_cols = [x for x in columns if x != "CustomerID"]
for col in null_cols:
    copy_incremental_df = copy_incremental_df.withColumn(col, F.lit(None))

update = (
    update
    .unionByName(
        copy_incremental_df
        .withColumn("ChangeType", lit("Addition"))
    )
    
)

# Add timestamp
update = update.withColumn("Timestamp", current_timestamp())

update.show()

+----------+-------+-------------------+------------+--------------------+
|CustomerID|   Name|              Email|  ChangeType|           Timestamp|
+----------+-------+-------------------+------------+--------------------+
|         2|    Bob|    bob@example.com|    Deletion|2023-08-21 15:27:...|
|         1|  Alice|  alice@example.com|Modification|2023-08-21 15:27:...|
|         3|Charlie|charlie@example.com|Modification|2023-08-21 15:27:...|
|         4|   null|               null|    Addition|2023-08-21 15:27:...|
|         5|   null|               null|    Addition|2023-08-21 15:27:...|
+----------+-------+-------------------+------------+--------------------+



# Perform the operations

In [8]:
# Deletions
df = df.filter(~F.col("CustomerID").isin(deletions))

# Modifications
df = (
    df
    .join(changes_df, ["CustomerID"], "leftanti")
    .unionByName(changes_df)
)

# Additions
df = df.unionByName(incremental_df)

df.show()

+----------+--------+--------------------+
|CustomerID|    Name|               Email|
+----------+--------+--------------------+
|         6|   Frank|   frank@example.com|
|         1|   Alice|alice.new@example...|
|         3|Chandler| charlie@example.com|
|         4|   David|   david@example.com|
|         5|     Eva|     eva@example.com|
+----------+--------+--------------------+



# Reverse the step

In [9]:
# Reverse additions
df = (
    df
    .join(update.filter(F.col("ChangeType") == "Addition").select(*columns), ["CustomerID"], "leftanti")
)

# Reverse modifications
changes_df = update.filter(F.col("ChangeType") == "Modification").select(*columns)
df = (
    df
    .join(changes_df, ["CustomerID"], "leftanti")
    .unionByName(changes_df)
)

# Reverse deletions
df = (
    df
    .unionByName(update.filter(F.col("ChangeType") == "Deletion").select(*columns))
)

df.show()

                                                                                

+----------+-------+-------------------+
|CustomerID|   Name|              Email|
+----------+-------+-------------------+
|         6|  Frank|  frank@example.com|
|         1|  Alice|  alice@example.com|
|         3|Charlie|charlie@example.com|
|         2|    Bob|    bob@example.com|
+----------+-------+-------------------+

