In [10]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col, when, lit, unix_timestamp

# Set up Spark session
package = "org.apache.hudi:hudi-spark3.4-bundle_2.12:0.15.0"
spark = SparkSession.builder \
    .appName("Hudi Basics") \
    .master("local[*]") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .config("spark.jars.packages", package) \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")\
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
    .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")\
    .getOrCreate()


In [11]:
spark

In [26]:
# Hudi write options
hudi_options = {
    'hoodie.table.name': 'test_cdc_table1',
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.precombine.field': 'precombine_key',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2
}

# Function to create precombine key
def create_precombine_key(df):
    df = df.withColumn("op_value", 
                       when(col("operation_type") == "U", 3)
                       .when(col("operation_type") == "I", 2)
                       .when(col("operation_type") == "D", 1)
                       .otherwise(0))
    
    return df.withColumn("precombine_key", 
                         unix_timestamp(col("operation_datetime")).cast("long") + col("op_value"))


In [27]:
# Function to handle deletes
def handle_deletes(df):
    deletes = df.filter(col("operation_type") == "D")
    non_deletes = df.filter(col("operation_type") != "D")
    
    deletes = deletes.withColumn("_hoodie_is_deleted", lit(True))
    non_deletes = non_deletes.withColumn("_hoodie_is_deleted", lit(False))
    
    return non_deletes.union(deletes)


In [28]:
# Sample data
data = [
    # Scenario 1: Multiple operations on the same key
    ("1", "2023-08-10 10:00:00", "I", "Initial insert"),
    ("1", "2023-08-10 11:00:00", "U", "Update 1 hour later"),
    ("1", "2023-08-10 12:00:00", "D", "Delete 2 hours later"),
    ("1", "2023-08-10 13:00:00", "I", "Re-insert 3 hours later"),

    # Scenario 2: Operations with the same timestamp
    ("2", "2023-08-10 14:00:00", "I", "Initial insert"),
    ("2", "2023-08-10 14:00:00", "U", "Update at the same time"),

    # Scenario 3: Delete followed by insert
    ("3", "2023-08-10 15:00:00", "I", "Initial insert"),
    ("3", "2023-08-10 16:00:00", "D", "Delete"),
    ("3", "2023-08-10 17:00:00", "I", "Re-insert"),

    # Scenario 4: Multiple updates
    ("4", "2023-08-10 18:00:00", "I", "Initial insert"),
    ("4", "2023-08-10 19:00:00", "U", "Update 1"),
    ("4", "2023-08-10 20:00:00", "U", "Update 2"),

    # Scenario 5: Insert after delete
    ("5", "2023-08-10 21:00:00", "I", "Initial insert"),
    ("5", "2023-08-10 22:00:00", "D", "Delete"),
    ("5", "2023-08-10 23:00:00", "I", "New insert")
]

# Create a DataFrame
df = spark.createDataFrame(data, ["id", "operation_datetime", "operation_type"])

print("Original DataFrame:")
df.show()

# Create the precombine key
df_with_precombine = create_precombine_key(df)
print("DataFrame with precombine key:")
df_with_precombine.show()

# Process deletes
df_processed = handle_deletes(df_with_precombine)

print("Processed DataFrame:")
df_processed.show()

# Write to Hudi
df_processed.write.format("hudi").options(**hudi_options).mode("overwrite").save("/home/jovyan/test_cdc_table1")


Original DataFrame:
+---+-------------------+--------------+--------------------+
| id| operation_datetime|operation_type|                  _4|
+---+-------------------+--------------+--------------------+
|  1|2023-08-10 10:00:00|             I|      Initial insert|
|  1|2023-08-10 11:00:00|             U| Update 1 hour later|
|  1|2023-08-10 12:00:00|             D|Delete 2 hours later|
|  1|2023-08-10 13:00:00|             I|Re-insert 3 hours...|
|  2|2023-08-10 14:00:00|             I|      Initial insert|
|  2|2023-08-10 14:00:00|             U|Update at the sam...|
|  3|2023-08-10 15:00:00|             I|      Initial insert|
|  3|2023-08-10 16:00:00|             D|              Delete|
|  3|2023-08-10 17:00:00|             I|           Re-insert|
|  4|2023-08-10 18:00:00|             I|      Initial insert|
|  4|2023-08-10 19:00:00|             U|            Update 1|
|  4|2023-08-10 20:00:00|             U|            Update 2|
|  5|2023-08-10 21:00:00|             I|      Init

In [17]:
# Reading from Hudi table
hudi_df = spark.read.format("hudi").load("/home/jovyan/test_cdc_table")
print("Data read from Hudi table:")
hudi_df.show(truncate=False)

Data read from Hudi table:
+-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+---+-------------------+--------------+-----------------------+--------+--------------+------------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                        |id |operation_datetime |operation_type|_4                     |op_value|precombine_key|_hoodie_is_deleted|
+-------------------+---------------------+------------------+----------------------+-------------------------------------------------------------------------+---+-------------------+--------------+-----------------------+--------+--------------+------------------+
|20240810122847683  |20240810122847683_0_0|1                 |                      |bf092cf4-ade6-4178-9dcd-b139dce3d42c-0_0-33-119_20240810122847683.parquet|1  |2023-08-10 1

In [21]:
df_processed.orderBy([col("id"), col("precombine_key")]).show()


+---+-------------------+--------------+--------------------+--------+--------------+------------------+
| id| operation_datetime|operation_type|                  _4|op_value|precombine_key|_hoodie_is_deleted|
+---+-------------------+--------------+--------------------+--------+--------------+------------------+
|  1|2023-08-10 10:00:00|             I|      Initial insert|       2|   16916616002|             false|
|  1|2023-08-10 11:00:00|             U| Update 1 hour later|       3|   16916652003|             false|
|  1|2023-08-10 12:00:00|             D|Delete 2 hours later|       1|   16916688001|              true|
|  1|2023-08-10 13:00:00|             I|Re-insert 3 hours...|       2|   16916724002|             false|
|  2|2023-08-10 14:00:00|             I|      Initial insert|       2|   16916760002|             false|
|  2|2023-08-10 14:00:00|             U|Update at the sam...|       3|   16916760003|             false|
|  3|2023-08-10 15:00:00|             I|      Initial i