In [1]:
from util import *
import pyspark.sql.functions as f
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

initial_data_file = "C:/Users/parth/Desktop/workspace/data_pipeline/resources/sample_data/customers.csv"
updated_data_file = "C:/Users/parth/Desktop/workspace/data_pipeline/resources/sample_data/customers_incremental.csv"

output_path = "C:/Users/parth/Desktop/workspace/data_pipeline/output/customers_incremental_with_history_modified"
table_name = "customers_incremental_with_history_modified"
primary_key_field = "id"
parition_field = "date"
precombine_field = "updated_date"
spark_write_mode = "append"
parition_from = "updated_date"
hudi_options = get_incremental_options(table_name, primary_key_field, parition_field, precombine_field)
hudi_options

{'hoodie.table.name': 'customers_incremental_with_history_modified',
 'hoodie.datasource.write.recordkey.field': 'id',
 'hoodie.datasource.write.partitionpath.field': 'date',
 'hoodie.datasource.write.table.name': 'customers_incremental_with_history_modified',
 'hoodie.datasource.write.operation': 'upsert',
 'hoodie.datasource.write.precombine.field': 'updated_date'}

In [2]:
import os
import shutil

if os.path.exists(output_path):
    shutil.rmtree(output_path)

In [3]:
spark = get_spark_with_hudi()

In [4]:
source_df = spark.read.option("header", "true").option("inferSchema", "true").format("csv").load(initial_data_file)

timestamp_columns = ["created_date", "updated_date"]

w_create_ts = "2023-01-01 00:00:00"

for column in timestamp_columns:
    source_df = source_df.withColumn(column, f.col(column).cast("timestamp"))

processed_df = (source_df.withColumn("date", f.date_format(f.col(parition_from), "yyyyMMdd"))
    .withColumn("w_create_ts", f.lit(w_create_ts).cast("timestamp")))

In [5]:
processed_df.write.format("hudi").options(**hudi_options).mode(spark_write_mode).save(output_path)

In [6]:
target_df = (spark.read.format("hudi")
    .load(output_path))

target_df.select("id", "name", "email", "created_date", "updated_date", "w_create_ts").toPandas().head(10)

Unnamed: 0,id,name,email,created_date,updated_date,w_create_ts
0,2,Brocky Spurret,bspurret1@npr.org,2023-01-01 05:35:00,2023-01-01 05:35:00,2023-01-01
1,1,Jammie McCamish,jmccamish0@devhub.com,2023-01-01 03:17:00,2023-01-01 03:17:00,2023-01-01


In [7]:
source_df = spark.read.option("header", "true").option("inferSchema", "true").format("csv").load(updated_data_file)

timestamp_columns = ["created_date", "updated_date"]

for column in timestamp_columns:
    source_df = source_df.withColumn(column, f.col(column).cast("timestamp"))

In [8]:
def resolve_column(column_name): 
    return (f.when(f.col(f"r.{column_name}").isNotNull() & (f.col(f"l.{column_name}") != f.col(f"r.{column_name}")), f.col(f"r.{column_name}"))
        .otherwise(f.col(f"l.{column_name}"))).alias(column_name)

In [9]:
columns = source_df.columns
left_df = target_df.select(*columns).alias("l")
right_df = source_df.select(*columns).alias("r")
intermediate_df = left_df.join(right_df, on=primary_key_field, how="left")
columns.remove(primary_key_field)
resolved_columns = [primary_key_field] + list(map(resolve_column, columns))
intermediate_df = intermediate_df.select(*resolved_columns)

In [10]:
from datetime import datetime
w_create_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
processed_df = (intermediate_df.withColumn("date", f.date_format(f.col(parition_from), "yyyyMMdd"))
    .withColumn("w_create_ts", f.lit(w_create_ts).cast("timestamp")))

In [16]:
processed_df.write.format("hudi").options(**hudi_options).mode(spark_write_mode).save(output_path)

In [17]:
target_df = (spark.read.format("hudi")
    .load(output_path))

target_df.select("id", "name", "email", "created_date", "updated_date", "w_create_ts").toPandas().head(10)

Unnamed: 0,id,name,email,created_date,updated_date,w_create_ts
0,1,Jammie McCamish,jmccamish0@develophub.com,2023-01-01 03:17:00,2023-01-02 03:17:00,2023-01-14 16:20:41
1,2,Brocky Spurret,bspurret1@npr.org,2023-01-01 05:35:00,2023-01-01 05:35:00,2023-01-14 16:20:41
