# Demo : Batch Updates to a S3 Datalake using Apache Hudi

## Table of Contents:

1. [Overview](#Overview)
2. [Bulk Insert the Initial Dataset](#Bulk-Insert-the-Initial-Dataset)
3. [Batch Upsert some records](#Batch-Upsert-some-records)
4. [Deleting Records](#Deleting-Records.)
4. [Working with Partitioned Tables](#Working-with-Partitioned-Tables)

## Overview

This notebook demonstrates using PySpark on [Apache Hudi](https://aws.amazon.com/emr/features/hudi/) on Amazon EMR to upsert records to an S3 data lake.

Here are some good reference links to read later:

* [Apache Hudi concepts](https://hudi.apache.org/concepts.html)
* [How Hudi Works](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-how-it-works.html)

This notebook covers the following concepts when writing Copy-On-Write tables to an S3 Datalake:

- Write Hudi Spark jobs in PySpark.
- Bulk Insert the Initial Dataset.
- Write a MultiKey Partitioned table as well as a Non-Partitioned table.
- Tune the Bulk Insert write performance as per expected number of target files.
- Sync the Hudi tables to the Hive/Glue Catalog.
- Upsert some records to a Hudi table.
- Delete from records from a Hudi table.
- Understand how Hudi Commit Retention policy works.

This demo runs fine on a single node (r5.4xlarge) EMR Cluster.

Let's start by initializing the Spark Session to connect this notebook to our Spark EMR cluster:

Note that the files hudi-spark-bundle.jar and spark-avro.jar are copied into HDFS.

In [1]:
%%configure -f
{
    "conf":  { 
             "spark.jars":"hdfs:///hudi-spark-bundle.jar,hdfs:///spark-avro.jar",
             "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
             "spark.sql.hive.convertMetastoreParquet":"false",
             "spark.dynamicAllocation.executorIdleTimeout": 3600,
             "spark.executor.memory": "7G",
             "spark.executor.cores": 1,
             "spark.dynamicAllocation.initialExecutors":16
           } 
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9,application_1583192269291_0010,spark,idle,Link,Link,


In [2]:
## CHANGE ME ##
config = {
    "table_name": "example_hudi_table",
    "target": "s3://hudi-workshop-1900-899011185738/tmp/hudi/example_hudi_table",
    "primary_key": "id",
    "sort_key": "sk",
    "commits_to_retain": "2"
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
11,application_1583192269291_0012,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The constants for Python to use:

In [3]:
# General Constants
HUDI_FORMAT = "org.apache.hudi"
TABLE_NAME = "hoodie.table.name"
RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
UPSERT_OPERATION_OPT_VAL = "upsert"
BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
S3_CONSISTENCY_CHECK = "hoodie.consistency.check.enabled"
HUDI_CLEANER_POLICY = "hoodie.cleaner.policy"
KEEP_LATEST_COMMITS = "KEEP_LATEST_COMMITS"
HUDI_COMMITS_RETAINED = "hoodie.cleaner.commits.retained"
PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class"
EMPTY_PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.EmptyHoodieRecordPayload"

# Hive Constants
HIVE_SYNC_ENABLED_OPT_KEY="hoodie.datasource.hive_sync.enable"
HIVE_PARTITION_FIELDS_OPT_KEY="hoodie.datasource.hive_sync.partition_fields"
HIVE_ASSUME_DATE_PARTITION_OPT_KEY="hoodie.datasource.hive_sync.assume_date_partitioning"
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY="hoodie.datasource.hive_sync.partition_extractor_class"
HIVE_TABLE_OPT_KEY="hoodie.datasource.hive_sync.table"

# Partition Constants
NONPARTITION_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.NonPartitionedExtractor"
MULIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.MultiPartKeysValueExtractor"
KEYGENERATOR_CLASS_OPT_KEY="hoodie.datasource.write.keygenerator.class"
NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.NonpartitionedKeyGenerator"
COMPLEX_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.ComplexKeyGenerator"
PARTITIONPATH_FIELD_OPT_KEY="hoodie.datasource.write.partitionpath.field"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Some helper functions:

In [4]:
## Generates Data
def get_json_data(start, count, increment=0):
    data = [{"id": i, "sk": i+increment, "txt": chr(65 + (i % 26))} for i in range(start, start + count)]
    return data

# Creates the Dataframe
def create_json_df(spark, data):
    sc = spark.sparkContext
    return spark.read.json(sc.parallelize(data, 2))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Bulk Insert the Initial Dataset

Let's generate 4M records to load into our Data Lake:

In [5]:
df1 = create_json_df(spark, get_json_data(0, 4000000))
print(df1.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

4000000

And write the data to S3:

In [6]:
(df1.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
      .option(BULK_INSERT_PARALLELISM, 3)
      .option(S3_CONSISTENCY_CHECK, "true")
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
      .mode("Overwrite")
      .save(config['target']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Let's observe the number of files in S3. Expected number of files is 3 files as BULK_INSERT_PARALLELISM is set to 3. 

When sizing an EMR Cluster, when the the number of executors matches the number of files to be written, you get perfect parallelism. However if the number of files being written is too large, the number of executors will be far smaller than the number of files to be written which will be the more usual scenario.

Some Apache Hudi Write Parameters to note when performing a bulk-insert are below though you may not need to override them all for every workload:

| Storage configs | ProbDistribution |
| :--- | :--- | 
| hoodie.parquet.max.file.size | Target size for parquet files produced by Hudi write phases. |
| hoodie.parquet.small.file.limit | This should be less < maxFileSize. |
| hoodie.parquet.compression.ratio | Expected compression of parquet data used by Hudi. |
| hoodie.bulkinsert.shuffle.parallelism | Parallelism determines the initial number of files in your table. |
| hoodie.consistency.check.enabled | Additional check to handle S3’s eventual consistency model. |

Let's inspect the table created and query the data:

In [9]:
spark.sql("show tables").show(100,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------------------+-----------+
|database|tableName         |isTemporary|
+--------+------------------+-----------+
|default |example_hudi_table|false      |
+--------+------------------+-----------+

In [10]:
spark.sql("show create table "+config['table_name']).show(100,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                                                                                                                                                                      

Note the extra columns that are added by Hudi to keep track of commits and filenames.

In [11]:
df2=spark.read.format(HUDI_FORMAT).load(config["target"]+"/*")
df2.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

4000000

We can query the Hive table as well:

In [12]:
spark.sql("select count(*) from "+config['table_name']).show(100,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|4000000 |
+--------+

### Batch Upsert some records

Let's modify a few records:

In [13]:
spark.sql("select id, sk from "+config['table_name'] +" where id between 3000000 and 3000010").show(100,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------+
|id     |sk     |
+-------+-------+
|3000000|3000000|
|3000001|3000001|
|3000002|3000002|
|3000003|3000003|
|3000004|3000004|
|3000005|3000005|
|3000006|3000006|
|3000007|3000007|
|3000008|3000008|
|3000009|3000009|
|3000010|3000010|
+-------+-------+

In [14]:
df2 = create_json_df(spark, get_json_data(3000000, 10, 1))
df2.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10

In [15]:
df2.select("id","sk").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------+
|     id|     sk|
+-------+-------+
|3000000|3000001|
|3000001|3000002|
|3000002|3000003|
|3000003|3000004|
|3000004|3000005|
|3000005|3000006|
|3000006|3000007|
|3000007|3000008|
|3000008|3000009|
|3000009|3000010|
+-------+-------+

We have incremented the value in the sk column by 1 for those 10 records and let's write the changes to S3. Note that the operation now is Upsert as opposed to BulkInsert for the 1st load:

```
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)

```

In [16]:
(df2.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 20)
      .option(S3_CONSISTENCY_CHECK, "true")
      .option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
      .option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)  
      .mode("Append")
      .save(config['target']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Let's observe the number of files in S3. Expected : 4 files. 

Let's rerun the previous cell and observe the number of files. 

And once more and observe the number of files. 

Notice that the number of files is not increasing beyond initial files(3) + commits_to_retain(2) = 5 files. This is because Hudi Cleaning Policy is deleting older files when writing as the commits_to_retain policy is set to 2.

Let's query our changed files:

In [17]:
df2=spark.read.format(HUDI_FORMAT).load(config["target"]+"/*")
df2.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

4000000

In [18]:
spark.sql("select id, sk from "+config['table_name'] +" where id between 3000000 and 3000010").show(100,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------+
|id     |sk     |
+-------+-------+
|3000000|3000001|
|3000001|3000002|
|3000002|3000003|
|3000003|3000004|
|3000004|3000005|
|3000005|3000006|
|3000006|3000007|
|3000007|3000008|
|3000008|3000009|
|3000009|3000010|
|3000010|3000010|
+-------+-------+

We can observe that the records are updated.

## Deleting Records.

Apache Hudi supports implementing two types of deletes on data stored in Hudi datasets, by enabling the user to specify a different record payload implementation.

* **Soft Deletes** : With soft deletes, user wants to retain the key but just null out the values for all other fields. This can be simply achieved by ensuring the appropriate fields are nullable in the dataset schema and simply upserting the dataset after setting these fields to null.
    
* **Hard Deletes** : A stronger form of delete is to physically remove any trace of the record from the dataset. 

Let's now execute some hard delete operations on our dataset which will remove the records from our dataset.

In [19]:
df2 = create_json_df(spark, get_json_data(3000000, 10, 1))
df2.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10

Let's now delete these 10 records. Note that the only change is the single line that set the hoodie.datasource.write.payload.class to org.apache.hudi.EmptyHoodieRecordPayload to delete the records.

```
.option(PAYLOAD_CLASS_OPT_KEY, EMPTY_PAYLOAD_CLASS_OPT_VAL)
```

In [20]:
(df2.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 20)
      .option(S3_CONSISTENCY_CHECK, "true")
      .option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
      .option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
      .option(PAYLOAD_CLASS_OPT_KEY, EMPTY_PAYLOAD_CLASS_OPT_VAL)
      .mode("Append")
      .save(config['target']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
spark.sql("select id, sk from "+config['table_name'] +" where id between 3000000 and 3000009").show(100,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+---+
|id |sk |
+---+---+
+---+---+

We can observe that the records no longer exist in our table.

## Working with Partitioned Tables

Let's do the same thing with Partitioned Tables.

In [24]:
## CHANGE ME ##
config = {
    "table_name": "example_hudi_partitioned_table",
    "target": "s3://hudi-workshop-1900-899011185738/tmp/hudi/example_hudi_partitioned_table",
    "primary_key": "id",
    "sort_key": "sk",
    "commits_to_retain": "2",
    "partition_keys" : "year,month"
}

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
import random

## Generates Data - we are adding year and month columns this time.
def get_json_data(start, count, increment=0):
    data = [{"id": i, "sk": i+increment, "txt": chr(65 + (i % 26)), "year" : "2019", "month": random.randint(1,12) } for i in range(start, start + count)]
    return data

# Creates the Dataframe
def create_json_df(spark, data):
    sc = spark.sparkContext
    return spark.read.json(sc.parallelize(data, 2))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Let's generate the data:

In [26]:
df1 = create_json_df(spark, get_json_data(0, 4000000))
print(df1.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

4000000

We add the partitionKey column to the dataframe.

In [27]:
from pyspark.sql.functions import concat, col, lit

hudiTablePartitionKey="partitionKey"
df1 = df1.withColumn(hudiTablePartitionKey,concat(lit("year="),col("year"),lit("/month="),col("month")))
df1.select(hudiTablePartitionKey).show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+
|      partitionKey|
+------------------+
| year=2019/month=8|
|year=2019/month=11|
| year=2019/month=5|
| year=2019/month=9|
| year=2019/month=4|
+------------------+
only showing top 5 rows

And we can now write out the data to S3. Notice that the Hive Partition Extractor class has changed in the statement below:

```
      .option(HIVE_PARTITION_FIELDS_OPT_KEY, config["partition_keys"])
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,MULIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL)
      .option(PARTITIONPATH_FIELD_OPT_KEY,"partitionKey")
```


In [28]:
(df1.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
      .option(BULK_INSERT_PARALLELISM, 3)
      .option(S3_CONSISTENCY_CHECK, "true")
      .option(HIVE_PARTITION_FIELDS_OPT_KEY, config["partition_keys"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,MULIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL)
      .option(PARTITIONPATH_FIELD_OPT_KEY,"partitionKey")
      .mode("Overwrite")
      .save(config['target']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
spark.sql("show create table "+config['table_name']).show(100,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                                                                        

We can see the partitions fields are present in our Hive table. 

```
PARTITIONED BY (`year` STRING, `month` BIGINT)
```

Let's now query the data and group by the the partition columns:

In [30]:
spark.sql("Select year, month, count(*) from "+config['table_name']+" group by year, month order by month").show(100,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----+--------+
|year|month|count(1)|
+----+-----+--------+
|2019|1    |334392  |
|2019|2    |333119  |
|2019|3    |332149  |
|2019|4    |333698  |
|2019|5    |332845  |
|2019|6    |333531  |
|2019|7    |333415  |
|2019|8    |333210  |
|2019|9    |333894  |
|2019|10   |332765  |
|2019|11   |333430  |
|2019|12   |333552  |
+----+-----+--------+

The other operations Upsert etc. behave the same way on Partitioned tables.