# Introduction

Hudi is a rich platform to build streaming data lakes with incremental data pipelines on a self-managing database layer, while being optimized for lake engines and regular batch processing.<br>
With ever growing data variety and volume, Data lakes became popular as a centralized repository that allows you to store all structured and unstructured data at any scale. Data lakes suffices the storage requirements and provides analytical capabilities, but doesn’t address the “transactional” requirements like the DML operations, ACID transactions (Atomicity, Consistency, Isolation, Durability) with concurrent reads and writes uses cases. With consumer privacy laws like GDPR, CCPA bring in more requirements that challenge the traditional designs of the data lake. Additionally, with businesses evolving there have been greater demands expected out of datalakes like the ability to apply Change Data Capture (CDC) at low latnecies, ability to rollback, travel back in time for point in time queries.<br>

__Apache Hudi__ is a lakehouse, data platform technology that provides an incremental processing framework to power business critical data pipelines combining the benefits of stream and batch processing at low letancy and high performance. Hudi provides felxibility to choose queries engines like Spark, Presto, Hive, Trino, Amazon Athena and build pipelines using Spark, Flink, Hive.<br>

Apache Hudi table format offers similar capabilities and functionalities that a traditional RDBMS provides but in a fully open table format so multiple engines like Spark, Trino, Presto etc can operate on the same dataset. It provides powerful features such as<br>

- DML operarations such as Upsert, Deletes
- Powerful feature to travel back in time to query point in time data to access sequential audit log of actions performed on the table
- Transactions, Rollbacks, Concurrency Control, read and write from multiple applications concurrently
- Automatic file sizing, data clustering, compactions, cleaning for efficient storage management of data and metadata
- Streaming ingestion, Built-in CDC sources & tools, ability to apply change data capture incrementally.

For more information, refer to [Apache Hudi Documentation](https://hudi.apache.org/docs/overview)


## Pre-Requisite:
For executing the code in this notebook you will need the below:<br>

- A AWS account <br>

__Below services should be created and configured__<br>

- EMR Studio
- EMR Studio Workspace
- EMR on EKS Virtual Cluster
- EKS Cluster (EC2 based)
- Managed Endpoint (Hudi options configured with in the managed endpoint)
- IAM Policy
- Application Load Balancer
- VPC and Subnet

__<font color=red>_Important step for Iceberg to work with EMR Studio (Jupyter Notebook - JUPYTER_ENTERPRISE_GATEWAY) on EMR-on-EKS cluster_</font>__

As Jupyter notebook (attached to EMR-on-EKS) does not support cell magic, we do not have an option to configure Spark specific parameters including Hudi Jars. For this reason we need to configure it as part of the Managed endpoint, below is the config. These are defaults, you can change the values according to your needs for e.g: increase executor.memory from 2G to 4G or 8G whatever you like.


    "configurationOverrides": {
        "applicationConfiguration": [
            {
                "classification": "spark-defaults",
                "properties": {
                    "spark.executor.memory": "2G",
                    "spark.driver.memory": "2G",
                    "spark.sql.hive.convertMetastoreParquet": "false",
                    "spark.kubernetes.executor.request.cores": "1.5",
                    "spark.driver.cores": "1",
                    "spark.sql.catalogImplementation": "hive",
                    "spark.executor.cores": "1",
                    "spark.dynamicAllocation.maxExecutors": "20",
                    "spark.dynamicAllocation.shuffleTracking.enabled": "true",
                    "spark.dynamicAllocation.shuffleTracking.timeout": "300s",
                    "spark.kubernetes.driver.request.cores": "0.5",
                    "spark.kubernetes.allocation.batch.size": "2",
                    "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
                    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
                    "spark.dynamicAllocation.minExecutors": "0",
                    "spark.dynamicAllocation.enabled": "true",
                    "spark.dynamicAllocation.executorAllocationRatio": "1",
                    "spark.jars": "local:///usr/lib/hudi/hudi-spark-bundle.jar,local:///usr/lib/spark/external/lib/spark-avro.jar"
              }
            }
          ]
        }

### Current Setup used for this notebook
- EMR version: __emr-6.8.0-latest__
- EKS version: __1.21__
- Instance Type for EKS cluster: __m5.xlarge__
- No of Instances: __3__
- Hudi Version: __0.11.1-amzn-0__
- Spark Version: __3.3.0__

In [7]:
# Import few libraries 
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime


## Create Dataframe and define Schema

In [8]:
InputData = [
    (1,'Prasad Nadig', 25, 'NJ','2022-01-01', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")),
    (2,'Ethereum', 80, 'NY', '2022-01-02', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")),
    (3,'Cosmos', 25, 'PA', '2022-01-03', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")),
    (4,'Solana', 55, 'MD', '2022-01-04', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")),
    (5,'Cardano', 15, 'TX', '2022-01-05', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")),
    (6,'Link', 45, 'NJ', '2022-01-06', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S"))
]

#Define schema for the source data
schema = StructType([ \
    StructField("cust_id",IntegerType(),True), \
    StructField("cust_name",StringType(),True), \
    StructField("cust_age",IntegerType(),True), \
    StructField("cust_loc",StringType(),True), \
    StructField("create_date", StringType(), True), \
    StructField("last_updated_time", TimestampType(), True)
  ])

#Create dataframe from the input data and the corresponding schema
inputDF = spark.createDataFrame(data=InputData,schema=schema)

In [9]:
#Check data 
inputDF.show()

+-------+------------+--------+--------+-----------+-------------------+
|cust_id|   cust_name|cust_age|cust_loc|create_date|  last_updated_time|
+-------+------------+--------+--------+-----------+-------------------+
|      1|Prasad Nadig|      25|      NJ| 2022-01-01|2022-11-28 15:06:58|
|      2|    Ethereum|      80|      NY| 2022-01-02|2022-11-28 15:06:58|
|      3|      Cosmos|      25|      PA| 2022-01-03|2022-11-28 15:06:58|
|      4|      Solana|      55|      MD| 2022-01-04|2022-11-28 15:06:58|
|      5|     Carnado|      15|      TX| 2022-01-05|2022-11-28 15:06:58|
|      6|        Link|      45|      NJ| 2022-01-06|2022-11-28 15:06:58|
+-------+------------+--------+--------+-----------+-------------------+



## Define HUDI options, write data to S3 as HUDI dataset - INSERT

In [15]:
# Define Hudi options that we will pass below while writing the data to S3 as HUDI dataset
# Hudi provides many other options that you can define as per your use case, refer to __[HUDI doc](https://hudi.apache.org/docs/configurations/)__ for more details
# Ensure partitionpath.field and partition_fields have the same value to be in sync

hudiOptions = {
'hoodie.table.name': 'customer',
'hoodie.datasource.write.recordkey.field': 'cust_id',
'hoodie.datasource.write.partitionpath.field': 'create_date',
'hoodie.datasource.write.precombine.field': 'last_updated_time',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.hive_sync.mode':'hms',
'hoodie.datasource.hive_sync.table': 'customer',
'hoodie.datasource.hive_sync.partition_fields': 'create_date',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
}

In [17]:
# Write dataframe that we created above to S3 as HUDI dataset. 
# As this is our first write, we will use write.operation as 'Insert' and .mode as 'overwrite'

inputDF.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'insert') \
.options(**hudiOptions) \
.mode('overwrite') \
.save('s3://emr-studio-emr-on-eks/hudi-tables/')

## Read data from HUDI Dataset we just created

In [18]:
# By default HUDI performs snapshot queries. 
snapshotQueryDF = spark.read \
    .format('org.apache.hudi') \
    .load('s3://emr-studio-emr-on-eks/hudi-tables' + '/*/*')
    
snapshotQueryDF.select("cust_id", "cust_name", "cust_age", "cust_loc", "create_date", "last_updated_time").orderBy("cust_id").show()

+-------+------------+--------+--------+-----------+-------------------+
|cust_id|   cust_name|cust_age|cust_loc|create_date|  last_updated_time|
+-------+------------+--------+--------+-----------+-------------------+
|      1|Prasad Nadig|      25|      NJ| 2022-01-01|2022-11-29 05:35:55|
|      2|    Ethereum|      80|      NY| 2022-01-02|2022-11-29 05:35:55|
|      3|      Cosmos|      25|      PA| 2022-01-03|2022-11-29 05:35:55|
|      4|      Solana|      55|      MD| 2022-01-04|2022-11-29 05:35:55|
|      5|     Carnado|      15|      TX| 2022-01-05|2022-11-29 05:35:55|
|      6|        Link|      45|      NJ| 2022-01-06|2022-11-29 05:35:55|
+-------+------------+--------+--------+-----------+-------------------+



# DML Operations

## UPSERT
###  - HUDI write operation provides 3 options Upsert/Insert and Bulk Insert, we did Insert in the previous steps, now lets try the upsert operation

In [19]:
# We will update an existing record and insert a new record. Upsert operation in HUDI will find the record based on the RecordKey, if found it will update the value, if not found then will Insert the record.
InputData = [
    (1,'Prasad S Nadig', 30, 'NJ','2022-01-01', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")), #Update
    (7,'Compound', 20, 'NJ', '2022-01-07', datetime.strptime(datetime.now().strftime("%Y-%d-%m %H:%M:%S"), "%Y-%d-%m %H:%M:%S")) #Insert
]

# Define schema for the source data
schema = StructType([ \
    StructField("cust_id",IntegerType(),True), \
    StructField("cust_name",StringType(),True), \
    StructField("cust_age",IntegerType(),True), \
    StructField("cust_loc",StringType(),True), \
    StructField("create_date", StringType(), True), \
    StructField("last_updated_time", TimestampType(), True)
  ])

# Create dataframe from the input data and the corresponding schema
updateDF = spark.createDataFrame(data=InputData,schema=schema)

In [20]:
# Now we will Update/Insert the data to HUDI dataset on S3, instead of insert, we will use "upsert" for write.operation and instead of overwrite, we will use "append" for .mode

updateDF.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert') \
.options(**hudiOptions) \
.mode('append') \
.save('s3://emr-studio-emr-on-eks/hudi-tables/')

In [21]:
# Exisitng record should now be updated and the new record should be inserted in the HUDI dataset
# You should see cust_name and cust_age for cust_id=1 is updated and a new record cust_id=7 is inserted.
# also notice that the last_updated_time is also updated for cust_id=1

snapshotQueryDF = spark.read \
    .format('org.apache.hudi') \
    .load('s3://emr-studio-emr-on-eks/hudi-tables' + '/*/*')
    
snapshotQueryDF.select("cust_id", "cust_name", "cust_age", "cust_loc", "create_date", "last_updated_time").orderBy("cust_id").show()

+-------+--------------+--------+--------+-----------+-------------------+
|cust_id|     cust_name|cust_age|cust_loc|create_date|  last_updated_time|
+-------+--------------+--------+--------+-----------+-------------------+
|      1|Prasad S Nadig|      30|      NJ| 2022-01-01|2022-11-29 06:16:37|
|      2|      Ethereum|      80|      NY| 2022-01-02|2022-11-29 05:35:55|
|      3|        Cosmos|      25|      PA| 2022-01-03|2022-11-29 05:35:55|
|      4|        Solana|      55|      MD| 2022-01-04|2022-11-29 05:35:55|
|      5|       Carnado|      15|      TX| 2022-01-05|2022-11-29 05:35:55|
|      6|          Link|      45|      NJ| 2022-01-06|2022-11-29 05:35:55|
|      7|      Compound|      20|      NJ| 2022-01-07|2022-11-29 06:16:37|
+-------+--------------+--------+--------+-----------+-------------------+



## DELETE

In [22]:
# HUDI allows you to delete records just like traditional RDBMS, so let's delete a record
deleteDF = snapshotQueryDF.where("cust_id==6")

In [23]:
# Write to HUDI dataset to apply the deletes
# We will pass empty payload to permanently delete the record from the HUDI dataset

deleteDF.write \
.format('org.apache.hudi') \
.option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.EmptyHoodieRecordPayload") \
.options(**hudiOptions) \
.mode('append') \
.save('s3://emr-studio-emr-on-eks/hudi-tables/')

In [24]:
# Notice that cust_id=6 has been permanently deleted from the dataset
deleteReadDF = spark.read \
    .format('org.apache.hudi') \
    .load('s3://emr-studio-emr-on-eks/hudi-tables' + '/*/*')
    
deleteReadDF.select("cust_id", "cust_name", "cust_age", "cust_loc", "create_date", "last_updated_time").orderBy("cust_id").show()


+-------+--------------+--------+--------+-----------+-------------------+
|cust_id|     cust_name|cust_age|cust_loc|create_date|  last_updated_time|
+-------+--------------+--------+--------+-----------+-------------------+
|      1|Prasad S Nadig|      30|      NJ| 2022-01-01|2022-11-29 06:16:37|
|      2|      Ethereum|      80|      NY| 2022-01-02|2022-11-29 05:35:55|
|      3|        Cosmos|      25|      PA| 2022-01-03|2022-11-29 05:35:55|
|      4|        Solana|      55|      MD| 2022-01-04|2022-11-29 05:35:55|
|      5|       Carnado|      15|      TX| 2022-01-05|2022-11-29 05:35:55|
|      7|      Compound|      20|      NJ| 2022-01-07|2022-11-29 06:16:37|
+-------+--------------+--------+--------+-----------+-------------------+



## Time Travel - Point in time Query

In [25]:
spark.read  \
    .format("hudi") \
    .load('s3://emr-studio-emr-on-eks/hudi-tables' + '/*/*') \
    .createOrReplaceTempView("hudi_snapshot")

### Commit time

In [26]:
# Hudi stores commit time for each DML operation performed on the "recordKey"in its metadata files. 
# Below query will fetch commit time for INSERT and for UPSERT by recordKey
spark.sql("select _hoodie_commit_time as commitTime, _hoodie_record_key as primaryKey, _hoodie_partition_path as partition from  hudi_snapshot order by commitTime").show()

+-----------------+----------+----------+
|       commitTime|primaryKey| partition|
+-----------------+----------+----------+
|20221129060138055|         2|2022-01-02|
|20221129060138055|         5|2022-01-05|
|20221129060138055|         3|2022-01-03|
|20221129060138055|         4|2022-01-04|
|20221129061653922|         1|2022-01-01|
|20221129061653922|         7|2022-01-07|
+-----------------+----------+----------+



In [27]:
#Lets get distinct commit time from the Hudi dataset
commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_snapshot order by commitTime").limit(50).collect()))

In [28]:
#print to verify the distinct values for commits
print(commits)

['20221129060138055', '20221129061653922']


In [29]:
#Set parameters for Hudi options
startTime = "000" # Fetches all available commits since start.
endTime = commits[len(commits) - 2] # Fetches the initial commit time

# Define Hudi options to query point in time data with a start and end time
time_travel_options = {
    'hoodie.datasource.query.type': 'incremental',
    'hoodie.datasource.read.end.instanttime': endTime,
    'hoodie.datasource.read.begin.instanttime': startTime
}

# get the initial table before upsert and delete (Original Inserts)
df_time_travel_read = spark.read.format("hudi") \
    .options(**time_travel_options)  \
    .load('s3://emr-studio-emr-on-eks/hudi-tables' + '/*/*') \
    .show()

+-------------------+--------------------+------------------+----------------------+--------------------+-------+------------+--------+--------+-----------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|cust_id|   cust_name|cust_age|cust_loc|create_date|  last_updated_time|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+------------+--------+--------+-----------+-------------------+
|  20221129060138055|20221129060138055...|                 1|            2022-01-01|42bbae9a-3144-455...|      1|Prasad Nadig|      25|      NJ| 2022-01-01|2022-11-29 05:35:55|
|  20221129060138055|20221129060138055...|                 2|            2022-01-02|f0f97bb9-03aa-4b5...|      2|    Ethereum|      80|      NY| 2022-01-02|2022-11-29 05:35:55|
|  20221129060138055|20221129060138055...|                 5|            2022-01-05|7afe7c55-79af-4b1...|      5|  

### Incremental Query

In [30]:
startTime = commits[len(commits) - 2] # fetch commit time for incremental data (UPSERT)

# fetch incremental data after initial insert, startTime represents the commit time for UPSERT
incremental_read_options = {
    'hoodie.datasource.query.type': 'incremental',
    'hoodie.datasource.read.begin.instanttime': startTime
}

df_customer_incremental_read = spark.read.format("hudi") \
    .options(**incremental_read_options)  \
    .load('s3://emr-studio-emr-on-eks/hudi-tables' + '/*/*') \
    .show()

+-------------------+--------------------+------------------+----------------------+--------------------+-------+--------------+--------+--------+-----------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|cust_id|     cust_name|cust_age|cust_loc|create_date|  last_updated_time|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+--------------+--------+--------+-----------+-------------------+
|  20221129061653922|20221129061653922...|                 1|            2022-01-01|42bbae9a-3144-455...|      1|Prasad S Nadig|      30|      NJ| 2022-01-01|2022-11-29 06:16:37|
|  20221129061653922|20221129061653922...|                 7|            2022-01-07|3477b517-c353-454...|      7|      Compound|      20|      NJ| 2022-01-07|2022-11-29 06:16:37|
+-------------------+--------------------+------------------+----------------------+--------------------+