# Lab : Batch Updates to a S3 Datalake using Apache Hudi

# Merge On Read


## Table of Contents:

1. [Overview](#Overview)
2. [Merge on Read](#Merge-On-Read) 
   2.1 [Bulk Insert the Initial Dataset](#Bulk-Insert-the-Initial-Dataset)
   2.2 [Batch Upsert some records](#Batch-Upsert-some-records)
   2.3 [Deleting Records](#Deleting-Records.)


## Overview

This notebook demonstrates using PySpark on [Apache Hudi](https://aws.amazon.com/emr/features/hudi/) on Amazon EMR to insert/upsert/delete records to an S3 data lake.

Here is a good reference link to read later:

* [How Hudi Works](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-how-it-works.html)

This notebook covers the following concepts when writing  Merge-On-Read tables to an S3 Datalake:

- Write Hudi Spark jobs in PySpark.
- Bulk Insert the Initial Dataset.
- Sync the Hudi tables to the Hive/Glue Catalog.
- Upsert some records to a Hudi MOR table.
- Delete records from a Hudi MOR table.



#### Pre-requisites

### This demo is based on Hudi version 0.5.0-incubating and runs fine on Jupyter Notebooks connected to a 1 node (r5.4xlarge) EMR cluster with configuration listed below 

 - EMR versions 5.29.0 or 6.0.0 
 
 - Software configuration

       - Hadoop 2.8.5
       - Hive 2.3.6
       - Livy 0.6.0
       - JupyterHub 1.0.0
       - Spark 2.4.4
       
       
 - AWS Glue Data Catalog settings - Select the below listed check boxes
       - Use for Hive table metadata  
       - Use for Spark table metadata



### Connect to the Master Node of EMR cluster Using SSH :
    - ssh -i ~/xxxx.pem hadoop@<ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com>

    - Ensure  the below listed files are copied into HDFS.

    - hadoop fs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar hdfs:///user/hadoop/

    - hadoop fs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar hdfs:///user/hadoop/

    - hadoop fs -copyFromLocal /usr/lib/spark/jars/httpclient-4.5.9.jar hdfs:///user/hadoop/

Let's start by initializing the Spark Session to connect this notebook to our Spark EMR cluster:

In [1]:
%%configure -f
{
    "conf":  { 
             "spark.jars":"hdfs:///user/hadoop/httpclient-4.5.9.jar, hdfs:///user/hadoop/hudi-spark-bundle.jar,hdfs:///user/hadoop/spark-avro.jar",
             "spark.sql.hive.convertMetastoreParquet":"false",     
             "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
             "spark.dynamicAllocation.executorIdleTimeout": 3600,
             "spark.executor.memory": "7G",
             "spark.executor.cores": 1,
             "spark.dynamicAllocation.initialExecutors":16
           } 
}

The constants for Python to use:

In [2]:
# General Constants
HUDI_FORMAT = "org.apache.hudi"
TABLE_NAME = "hoodie.table.name"
RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
UPSERT_OPERATION_OPT_VAL = "upsert"
BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
S3_CONSISTENCY_CHECK = "hoodie.consistency.check.enabled"
HUDI_CLEANER_POLICY = "hoodie.cleaner.policy"
KEEP_LATEST_COMMITS = "KEEP_LATEST_COMMITS"
HUDI_COMMITS_RETAINED = "hoodie.cleaner.commits.retained"
PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class"
EMPTY_PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.EmptyHoodieRecordPayload"

# Hive Constants
HIVE_SYNC_ENABLED_OPT_KEY="hoodie.datasource.hive_sync.enable"
HIVE_PARTITION_FIELDS_OPT_KEY="hoodie.datasource.hive_sync.partition_fields"
HIVE_ASSUME_DATE_PARTITION_OPT_KEY="hoodie.datasource.hive_sync.assume_date_partitioning"
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY="hoodie.datasource.hive_sync.partition_extractor_class"
HIVE_TABLE_OPT_KEY="hoodie.datasource.hive_sync.table"

# Partition Constants
NONPARTITION_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.NonPartitionedExtractor"
MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.MultiPartKeysValueExtractor"
KEYGENERATOR_CLASS_OPT_KEY="hoodie.datasource.write.keygenerator.class"
NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.NonpartitionedKeyGenerator"
COMPLEX_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.ComplexKeyGenerator"
PARTITIONPATH_FIELD_OPT_KEY="hoodie.datasource.write.partitionpath.field"

#Incremental Constants
VIEW_TYPE_OPT_KEY="hoodie.datasource.view.type"
BEGIN_INSTANTTIME_OPT_KEY="hoodie.datasource.read.begin.instanttime"
VIEW_TYPE_INCREMENTAL_OPT_VAL="incremental"
END_INSTANTTIME_OPT_KEY="hoodie.datasource.read.end.instanttime"

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
47,application_1593789203399_0081,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Functions to create JSON data and Spark dataframe from this data

### Merge On Read 

For near real-time applications that mandate quick upserts, MERGE_ON_READ table type would be better suited. MOR table stores incoming upserts for each file group, onto a row based delta log (In Avro file format). This log is then merged with the existing Parquet file using a compactor during reads.

In [3]:
## CHANGE ME ##
config = {
    "table_name": "hudi_mor_trips_table",
    "target": "s3://<Your S3 Bucket Here>/tmp/hudi/hudi_mor_trips_table",
    "primary_key": "trip_id",
    "sort_key": "tstamp",
    "commits_to_retain": "2",
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
STORAGE_TYPE_OPT_KEY="hoodie.datasource.write.storage.type"
COMPACTION_INLINE_OPT_KEY="hoodie.compact.inline"
COMPACTION_MAX_DELTA_COMMITS_OPT_KEY="hoodie.compact.inline.max.delta.commits"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
## Generates Data

from datetime import datetime

def get_json_data(start, count, dest):
    time_stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    data = [{"trip_id": i, "tstamp": time_stamp, "route_id": chr(65 + (i % 10)), "destination": dest[i%10]} for i in range(start, start + count)]
    return data

# Creates the Dataframe
def create_json_df(spark, data):
    sc = spark.sparkContext
    return spark.read.json(sc.parallelize(data))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Bulk Insert the Initial Dataset

Let's generate 2M records to load into our Data Lake:

In [6]:
mor_dest = ["Seattle", "New York", "New Jersey", "Los Angeles", "Las Vegas", "Tucson","Washington DC","Philadelphia","Miami","San Francisco"]
df2 = create_json_df(spark, get_json_data(0, 2000000, mor_dest))
df2.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+--------+-------+-------------------+
|  destination|route_id|trip_id|             tstamp|
+-------------+--------+-------+-------------------+
|      Seattle|       A|      0|2020-07-16 10:39:11|
|     New York|       B|      1|2020-07-16 10:39:11|
|   New Jersey|       C|      2|2020-07-16 10:39:11|
|  Los Angeles|       D|      3|2020-07-16 10:39:11|
|    Las Vegas|       E|      4|2020-07-16 10:39:11|
|       Tucson|       F|      5|2020-07-16 10:39:11|
|Washington DC|       G|      6|2020-07-16 10:39:11|
| Philadelphia|       H|      7|2020-07-16 10:39:11|
|        Miami|       I|      8|2020-07-16 10:39:11|
|San Francisco|       J|      9|2020-07-16 10:39:11|
|      Seattle|       A|     10|2020-07-16 10:39:11|
|     New York|       B|     11|2020-07-16 10:39:11|
|   New Jersey|       C|     12|2020-07-16 10:39:11|
|  Los Angeles|       D|     13|2020-07-16 10:39:11|
|    Las Vegas|       E|     14|2020-07-16 10:39:11|
|       Tucson|       F|     15|2020-07-16 10:


Bulk insert will take the same time as COW as this is the first write 

We will be using the Copy on write storage option(STORAGE_TYPE_OPT_KEY, "MERGE_ON_READ") which  is to be explicitly set.

In [None]:
(df2.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
      .option(BULK_INSERT_PARALLELISM, 3)
      .option(S3_CONSISTENCY_CHECK, "true")
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
      .option(STORAGE_TYPE_OPT_KEY, "MERGE_ON_READ")
      .option(COMPACTION_INLINE_OPT_KEY, "false")
      .option(COMPACTION_MAX_DELTA_COMMITS_OPT_KEY, "20")
      .mode("Overwrite")
      .save(config['target']))

### Verify the number of files 

Let us check the contents of S3 path. Bulk insert operation on Copy-On-Write and Merge-On-Read tables is identical in terms of performance. 

```
$ aws s3 ls s3://<Your S3 Bucket Here>/tmp/hudi/hudi_mor_trips_table/
                           PRE .hoodie/
2020-04-28 23:30:20          0 .hoodie_$folder$
2020-04-28 23:30:26         93 .hoodie_partition_metadata
2020-04-28 23:30:33    4378000 45b1ce07-f9ac-496d-8b03-20af011a0c44-0_1-194-3566_20200428233020.parquet
2020-04-28 23:30:34    5048941 932d5e97-c5f0-4c91-a7f6-f65d487a5e2b-0_2-194-3567_20200428233020.parquet
2020-04-28 23:30:34    5065824 ea6e8bfa-e70c-4f7e-90ec-37d018fb0acf-0_0-194-3565_20200428233020.parquet
```

Notice the delta commits 

```
$ aws s3 ls s3://<Your S3 Bucket Here>/tmp/hudi/hudi_mor_trips_table/.hoodie/
2020-04-28 23:30:21          0 .aux_$folder$
2020-04-28 23:30:21          0 .temp_$folder$
2020-04-28 23:30:37       1077 20200428233020.clean
2020-04-28 23:30:36       4929 20200428233020.deltacommit
2020-04-28 23:30:21          0 archived_$folder$
2020-04-28 23:30:21        264 hoodie.properties
```

This is the first commit 

## Batch Upsert some records


Now let us try to upsert some records into MOR table

In [None]:
upsert_dest = ["San Diego", "San Diego", "San Diego", "San Diego", "San Diego","San Diego","San Diego","San Diego","San Diego","San Diego"]
df3 = create_json_df(spark, get_json_data(1000000, 10, upsert_dest))
df3.count()
df3.show()

In [None]:
(df3.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 20)
      .option(S3_CONSISTENCY_CHECK, "true")
      .option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
      .option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)  
      .option(STORAGE_TYPE_OPT_KEY, "MERGE_ON_READ")
      .option(COMPACTION_INLINE_OPT_KEY, "false")
      .option(COMPACTION_MAX_DELTA_COMMITS_OPT_KEY, "20")
      .mode("Append")
      .save(config['target']))

### Verify the number of tables created


In [None]:
spark.sql("show tables").show(20,False)


We can observe that there are two tables created for  Merge On Read storage option. 

1.hudi_mor_trips_table     
2.hudi_mor_trips_table_rt  


Lets query the table  -> hudi_mor_trips_table 

In [None]:
spark.sql("select trip_id, route_id, destination, tstamp from "+config['table_name']+" where trip_id between 999996 and 1000010").show(20,False)

Lets query the real-time table -> hudi_mor_trips_table_rt

In [None]:
spark.sql("select trip_id, route_id, destination, tstamp from "+config['table_name']+"_rt where trip_id between 999996 and 1000010").show(20,False)

Check the S3 path again. There is no change in number of Parquet files after upsert operation unlike Copy-On-Write tables. 

```
$ aws s3 ls s3://<Your S3 Bucket Here>/tmp/hudi/hudi_mor_trips_table/
                           PRE .hoodie/
2020-04-28 23:33:22       2071 .ea6e8bfa-e70c-4f7e-90ec-37d018fb0acf-0_20200428233020.log.1_0-227-3837
2020-04-28 23:30:20          0 .hoodie_$folder$
2020-04-28 23:30:26         93 .hoodie_partition_metadata
2020-04-28 23:30:33    4378000 45b1ce07-f9ac-496d-8b03-20af011a0c44-0_1-194-3566_20200428233020.parquet
2020-04-28 23:30:34    5048941 932d5e97-c5f0-4c91-a7f6-f65d487a5e2b-0_2-194-3567_20200428233020.parquet
2020-04-28 23:30:34    5065824 ea6e8bfa-e70c-4f7e-90ec-37d018fb0acf-0_0-194-3565_20200428233020.parquet

```


## Deleting Records.

Apache Hudi supports implementing two types of deletes on data stored in Hudi datasets, by enabling the user to specify a different record payload implementation.

* **Soft Deletes** : With soft deletes, user wants to retain the key but just null out the values for all other fields. This can be simply achieved by ensuring the appropriate fields are nullable in the dataset schema and simply upserting the dataset after setting these fields to null.
    
* **Hard Deletes** : A stronger form of delete is to physically remove any trace of the record from the dataset. 

Let's now execute some hard delete operations on our dataset which will remove the records from our dataset.

Let's delete the 10 records with the "San Diego" destination we upserted to the table. Note that the only change is the single line that set the hoodie.datasource.write.payload.class to org.apache.hudi.EmptyHoodieRecordPayload to delete the records.

```
.option(PAYLOAD_CLASS_OPT_KEY, EMPTY_PAYLOAD_CLASS_OPT_VAL)
```

In [None]:
(df3.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 20)
      .option(S3_CONSISTENCY_CHECK, "true")
      .option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
      .option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
      .option(STORAGE_TYPE_OPT_KEY, "MERGE_ON_READ")
      .option(PAYLOAD_CLASS_OPT_KEY, EMPTY_PAYLOAD_CLASS_OPT_VAL)
      .mode("Append")
      .save(config['target']))




Query and observe that the updated records do not exist in the two tables after deletion. 

1.hudi_mor_trips_table     
2.hudi_mor_trips_table_rt  


In [None]:
spark.sql("select trip_id, route_id, destination, tstamp from "+config['table_name'] +" where trip_id between 999996 and 1000010").show(20,False)

In [None]:
spark.sql("select trip_id, route_id, destination, tstamp from "+config['table_name']+"_rt where trip_id between 999996 and 1000010").show(20,False)


Check the count of records in the two tables after deletion. 

1.hudi_mor_trips_table     
2.hudi_mor_trips_table_rt  


observe that total record count indicates that 10 upserted records have been deleted

In [None]:
spark.sql("select count(*) from "+config['table_name']+"_rt" ).show(20,False)

In [None]:
spark.sql("select count(*) from "+config['table_name']).show(20,False)