## Overview

This notebook demonstrates using PySpark on [Apache Hudi](https://aws.amazon.com/emr/features/hudi/) on Amazon EMR to insert/upsert/delete records to an S3 data lake.

Here are some good reference links to read later:

* [Apache Hudi concepts](https://hudi.apache.org/concepts.html)
* [How Hudi Works](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-how-it-works.html)

This notebook covers the following concepts when writing Copy-On-Write and Merge-On-Read tables to an S3 Datalake:

- Write Hudi Spark jobs in PySpark.
- Bulk Insert the Initial Dataset.
- Write a MultiKey Partitioned table as well as a Non-Partitioned table.
- Tune the Bulk Insert write performance as per expected number of target files.
- Sync the Hudi tables to the Hive/Glue Catalog.
- Upsert some records to a Hudi table.
- Delete from records from a Hudi table.
- Understand how Hudi Commit Retention policy works.
- Time travel with Hudi incremental tables using incremental and point-in-time queries
- Perform Insert/Upsert/Delete operations for Merge-on-Read table and understand the difference between MOR and COW tables


#### This demo runs fine on a 1 node (r5.4xlarge) EMR release >= 5.30 Cluster.

Let's start by initializing the Spark Session to connect this notebook to our Spark EMR cluster:

Note that the files hudi-spark-bundle.jar and spark-avro.jar are copied into HDFS.

hadoop fs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar hdfs:///user/hadoop/

hadoop fs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar hdfs:///user/hadoop/

hadoop fs -copyFromLocal /usr/lib/spark/jars/httpclient-4.5.9.jar hdfs:///user/hadoop/

In [1]:
%%configure -f
{
    "conf":  { 
             "spark.jars":"hdfs:///user/hadoop/httpclient-4.5.9.jar, hdfs:///user/hadoop/hudi-spark-bundle.jar,hdfs:///user/hadoop/spark-avro.jar",
             "spark.sql.hive.convertMetastoreParquet":"false",     
             "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
             "spark.dynamicAllocation.executorIdleTimeout": 60
           } 
}

## Configuration

Following are the configuration items we will use in this demo

Make sure to point the target parameter to your S3 bucket (replace <your-bucket> with an S3 bucket name in your AWS account in the same region as your EMR cluster)

In [2]:
## CHANGE ME ##
config = {
    "table_name": "hudi_trips_table",
    "target": "s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_trips_table",
    "primary_key": "trip_id",
    "sort_key": "tstamp",
    "commits_to_retain": "10"
}

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
6,application_1595349369216_0012,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The constants for Python to use:

In [3]:
# General Constants
HUDI_FORMAT = "org.apache.hudi"
TABLE_NAME = "hoodie.table.name"
RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
UPSERT_OPERATION_OPT_VAL = "upsert"
BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
S3_CONSISTENCY_CHECK = "hoodie.consistency.check.enabled"
HUDI_CLEANER_POLICY = "hoodie.cleaner.policy"
KEEP_LATEST_COMMITS = "KEEP_LATEST_COMMITS"
HUDI_COMMITS_RETAINED = "hoodie.cleaner.commits.retained"
PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class"
EMPTY_PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.common.model.EmptyHoodieRecordPayload"

# Hive Constants
HIVE_SYNC_ENABLED_OPT_KEY="hoodie.datasource.hive_sync.enable"
HIVE_PARTITION_FIELDS_OPT_KEY="hoodie.datasource.hive_sync.partition_fields"
HIVE_ASSUME_DATE_PARTITION_OPT_KEY="hoodie.datasource.hive_sync.assume_date_partitioning"
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY="hoodie.datasource.hive_sync.partition_extractor_class"
HIVE_TABLE_OPT_KEY="hoodie.datasource.hive_sync.table"

# Partition Constants
NONPARTITION_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.NonPartitionedExtractor"
MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.MultiPartKeysValueExtractor"
KEYGENERATOR_CLASS_OPT_KEY="hoodie.datasource.write.keygenerator.class"
NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.keygen.NonpartitionedKeyGenerator"
COMPLEX_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.ComplexKeyGenerator"
PARTITIONPATH_FIELD_OPT_KEY="hoodie.datasource.write.partitionpath.field"

#Incremental Constants
VIEW_TYPE_OPT_KEY="hoodie.datasource.view.type"
BEGIN_INSTANTTIME_OPT_KEY="hoodie.datasource.read.begin.instanttime"
VIEW_TYPE_INCREMENTAL_OPT_VAL="incremental"
END_INSTANTTIME_OPT_KEY="hoodie.datasource.read.end.instanttime"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Functions to create JSON data and Spark dataframe from this data

In [4]:
## Generates Data

from datetime import datetime

def get_json_data(start, count, dest):
    time_stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    data = [{"trip_id": i, "tstamp": time_stamp, "route_id": chr(65 + (i % 10)), "destination": dest[i%10]} for i in range(start, start + count)]
    return data

# Creates the Dataframe
def create_json_df(spark, data):
    sc = spark.sparkContext
    return spark.read.json(sc.parallelize(data))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Bulk Insert the Initial Dataset

Let's generate 2M records to load into our Data Lake:

In [5]:
dest = ["Seattle", "New York", "New Jersey", "Los Angeles", "Las Vegas", "Tucson","Washington DC","Philadelphia","Miami","San Francisco"]
df1 = create_json_df(spark, get_json_data(0, 2000000, dest))
print(df1.count())
df1.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2000000
+-------------+--------+-------+-------------------+
|  destination|route_id|trip_id|             tstamp|
+-------------+--------+-------+-------------------+
|      Seattle|       A|      0|2020-07-21 22:47:17|
|     New York|       B|      1|2020-07-21 22:47:17|
|   New Jersey|       C|      2|2020-07-21 22:47:17|
|  Los Angeles|       D|      3|2020-07-21 22:47:17|
|    Las Vegas|       E|      4|2020-07-21 22:47:17|
|       Tucson|       F|      5|2020-07-21 22:47:17|
|Washington DC|       G|      6|2020-07-21 22:47:17|
| Philadelphia|       H|      7|2020-07-21 22:47:17|
|        Miami|       I|      8|2020-07-21 22:47:17|
|San Francisco|       J|      9|2020-07-21 22:47:17|
|      Seattle|       A|     10|2020-07-21 22:47:17|
|     New York|       B|     11|2020-07-21 22:47:17|
|   New Jersey|       C|     12|2020-07-21 22:47:17|
|  Los Angeles|       D|     13|2020-07-21 22:47:17|
|    Las Vegas|       E|     14|2020-07-21 22:47:17|
|       Tucson|       F|     15|2020-0

And write the data to S3:

In [6]:
(df1.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
      .option(BULK_INSERT_PARALLELISM, 3)
      .option(S3_CONSISTENCY_CHECK, "true")
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
      .mode("Overwrite")
      .save(config['target']))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Let's observe the number of files in S3. Expected number of files is 3 files as BULK_INSERT_PARALLELISM is set to 3. 
```

$ aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_trips_table/ --summarize --human-readable
                           PRE .hoodie/
2020-04-28 23:11:39    0 Bytes .hoodie_$folder$
2020-04-28 23:11:52   93 Bytes .hoodie_partition_metadata
2020-04-28 23:11:59    4.8 MiB 0eeafaf2-0110-4056-b509-724b11a4c0b6-0_0-7-67_20200428231141.parquet
2020-04-28 23:11:59    4.4 MiB 1ce9cbac-56e4-477e-a2bc-33fe62b3550f-0_1-7-68_20200428231141.parquet
2020-04-28 23:11:59    4.6 MiB 579443dc-8f56-4990-bed6-a527f21e9682-0_2-7-69_20200428231141.parquet

Total Objects: 5
   Total Size: 13.8 MiB

```

```

Let's inspect the table created and query the data:

In [9]:
%%local
!aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_trips_table/ --summarize --human-readable

                           PRE .hoodie/
2020-07-21 22:48:02    0 Bytes .hoodie_$folder$
2020-07-21 22:48:22   93 Bytes .hoodie_partition_metadata
2020-07-21 22:48:32    5.0 MiB 4b61fb70-cde1-488b-abc7-c1c2f85c0092-0_1-8-12_20200721224758.parquet
2020-07-21 22:48:31    4.3 MiB 7790f62b-531e-47cb-8b0f-9eb826e0f7fb-0_0-8-11_20200721224758.parquet
2020-07-21 22:48:35    4.5 MiB 7b0c13f2-6017-4c92-981d-5675559dbc97-0_2-8-13_20200721224758.parquet

Total Objects: 5
   Total Size: 13.8 MiB


In [10]:
spark.sql("show tables").show(20,False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----------------------+-----------+
|database|tableName              |isTemporary|
+--------+-----------------------+-----------+
|default |hudi_mor_trips_table_ro|false      |
|default |hudi_mor_trips_table_rt|false      |
|default |hudi_trips_table       |false      |
+--------+-----------------------+-----------+

In [11]:
spark.sql("show create table "+config['table_name']).show(20,False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                                                                                                                                     

Note the extra columns that are added by Hudi to keep track of commits and filenames.

In [12]:
df2=spark.read.format(HUDI_FORMAT).load(config["target"]+"/*")
df2.count()
df2.printSchema()
#df2.show(20,False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- route_id: string (nullable = true)
 |-- trip_id: long (nullable = true)
 |-- tstamp: string (nullable = true)

We can query the Hive table as well:

In [13]:
spark.sql("select count(*) from "+config['table_name']).show(20,False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+
|count(1)|
+--------+
|2000000 |
+--------+

### Batch Upsert some records

Let's modify a few records:

In [14]:
spark.sql("select trip_id, route_id, destination, tstamp from "+config['table_name'] +" where trip_id between 1000000 and 1000009").show(20,False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-------------+-------------------+
|trip_id|route_id|destination  |tstamp             |
+-------+--------+-------------+-------------------+
|1000000|A       |Seattle      |2020-07-21 22:47:17|
|1000001|B       |New York     |2020-07-21 22:47:17|
|1000002|C       |New Jersey   |2020-07-21 22:47:17|
|1000003|D       |Los Angeles  |2020-07-21 22:47:17|
|1000004|E       |Las Vegas    |2020-07-21 22:47:17|
|1000005|F       |Tucson       |2020-07-21 22:47:17|
|1000006|G       |Washington DC|2020-07-21 22:47:17|
|1000007|H       |Philadelphia |2020-07-21 22:47:17|
|1000008|I       |Miami        |2020-07-21 22:47:17|
|1000009|J       |San Francisco|2020-07-21 22:47:17|
+-------+--------+-------------+-------------------+

In [15]:
upsert_dest = ["Boston", "Boston", "Boston", "Boston", "Boston","Boston","Boston","Boston","Boston","Boston"]
df3 = create_json_df(spark, get_json_data(1000000, 10, upsert_dest))
df3.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10

In [16]:
df3.select("trip_id","route_id","tstamp","destination").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-------------------+-----------+
|trip_id|route_id|             tstamp|destination|
+-------+--------+-------------------+-----------+
|1000000|       A|2020-07-21 22:52:38|     Boston|
|1000001|       B|2020-07-21 22:52:38|     Boston|
|1000002|       C|2020-07-21 22:52:38|     Boston|
|1000003|       D|2020-07-21 22:52:38|     Boston|
|1000004|       E|2020-07-21 22:52:38|     Boston|
|1000005|       F|2020-07-21 22:52:38|     Boston|
|1000006|       G|2020-07-21 22:52:38|     Boston|
|1000007|       H|2020-07-21 22:52:38|     Boston|
|1000008|       I|2020-07-21 22:52:38|     Boston|
|1000009|       J|2020-07-21 22:52:38|     Boston|
+-------+--------+-------------------+-----------+

We have changed the destination and timestamp for trip IDs 1000000 to 1000010. Now, let's upsert the changes to S3. Note that the operation now is "Upsert" as opposed to BulkInsert for the initial load:

```
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)

```

In [17]:
(df3.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 20)
      .option(S3_CONSISTENCY_CHECK, "true")
      .option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
      .option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)  
      .mode("Append")
      .save(config['target']))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
spark.sql("select trip_id, route_id, tstamp, destination from "+config['table_name'] +" where trip_id between 999996 and 1000013").show(50,False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-------------------+-------------+
|trip_id|route_id|tstamp             |destination  |
+-------+--------+-------------------+-------------+
|1000000|A       |2020-07-21 22:52:38|Boston       |
|1000001|B       |2020-07-21 22:52:38|Boston       |
|1000002|C       |2020-07-21 22:52:38|Boston       |
|1000003|D       |2020-07-21 22:52:38|Boston       |
|1000004|E       |2020-07-21 22:52:38|Boston       |
|1000005|F       |2020-07-21 22:52:38|Boston       |
|1000006|G       |2020-07-21 22:52:38|Boston       |
|1000007|H       |2020-07-21 22:52:38|Boston       |
|1000008|I       |2020-07-21 22:52:38|Boston       |
|1000009|J       |2020-07-21 22:52:38|Boston       |
|1000010|A       |2020-07-21 22:47:17|Seattle      |
|1000011|B       |2020-07-21 22:47:17|New York     |
|1000012|C       |2020-07-21 22:47:17|New Jersey   |
|1000013|D       |2020-07-21 22:47:17|Los Angeles  |
|999996 |G       |2020-07-21 22:47:17|Washington DC|
|999997 |H       |2020-07-21 22:47:17|Philadel

Now lets check the commit

```
$ aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_trips_table/ --summarize --human-readable
                           PRE .hoodie/
2020-04-28 23:11:39    0 Bytes .hoodie_$folder$
2020-04-28 23:11:52   93 Bytes .hoodie_partition_metadata
2020-04-28 23:15:43    4.8 MiB 0eeafaf2-0110-4056-b509-724b11a4c0b6-0_0-51-609_20200428231528.parquet
2020-04-28 23:11:59    4.8 MiB 0eeafaf2-0110-4056-b509-724b11a4c0b6-0_0-7-67_20200428231141.parquet
2020-04-28 23:11:59    4.4 MiB 1ce9cbac-56e4-477e-a2bc-33fe62b3550f-0_1-7-68_20200428231141.parquet
2020-04-28 23:11:59    4.6 MiB 579443dc-8f56-4990-bed6-a527f21e9682-0_2-7-69_20200428231141.parquet

Total Objects: 6
   Total Size: 18.6 MiB

```

In [20]:
%%local
!aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_trips_table/ --summarize --human-readable

                           PRE .hoodie/
2020-07-21 22:48:02    0 Bytes .hoodie_$folder$
2020-07-21 22:48:22   93 Bytes .hoodie_partition_metadata
2020-07-21 22:48:32    5.0 MiB 4b61fb70-cde1-488b-abc7-c1c2f85c0092-0_1-8-12_20200721224758.parquet
2020-07-21 22:53:03    4.3 MiB 7790f62b-531e-47cb-8b0f-9eb826e0f7fb-0_0-51-260_20200721225251.parquet
2020-07-21 22:48:31    4.3 MiB 7790f62b-531e-47cb-8b0f-9eb826e0f7fb-0_0-8-11_20200721224758.parquet
2020-07-21 22:48:35    4.5 MiB 7b0c13f2-6017-4c92-981d-5675559dbc97-0_2-8-13_20200721224758.parquet

Total Objects: 6
   Total Size: 18.1 MiB


Now that we upserted some records, let us try to insert 10 new records into the table. We will use same upsert option. As primary keys 2000000 to 2000009 do not exist in the table, the records will be inserted. 

In [21]:
insert_dest = ["Syracuse", "Syracuse", "Syracuse", "Syracuse", "Syracuse", "Syracuse", "Syracuse", "Syracuse", "Syracuse", "Syracuse"]
df5 = create_json_df(spark, get_json_data(2000000, 10, insert_dest))
df5.count()
df5.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------+-------+-------------------+
|destination|route_id|trip_id|             tstamp|
+-----------+--------+-------+-------------------+
|   Syracuse|       A|2000000|2020-07-21 22:54:55|
|   Syracuse|       B|2000001|2020-07-21 22:54:55|
|   Syracuse|       C|2000002|2020-07-21 22:54:55|
|   Syracuse|       D|2000003|2020-07-21 22:54:55|
|   Syracuse|       E|2000004|2020-07-21 22:54:55|
|   Syracuse|       F|2000005|2020-07-21 22:54:55|
|   Syracuse|       G|2000006|2020-07-21 22:54:55|
|   Syracuse|       H|2000007|2020-07-21 22:54:55|
|   Syracuse|       I|2000008|2020-07-21 22:54:55|
|   Syracuse|       J|2000009|2020-07-21 22:54:55|
+-----------+--------+-------+-------------------+

In [22]:
(df5.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 20)
      .option(S3_CONSISTENCY_CHECK, "true")
      .option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
      .option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)  
      .mode("Append")
      .save(config['target']))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
df6=spark.read.format(HUDI_FORMAT).load(config["target"]+"/*")
df6.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2000010

In [24]:
spark.sql("select trip_id, route_id, tstamp, destination from "+config['table_name'] +" where trip_id > 1999996").show(50,False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-------------------+-------------+
|trip_id|route_id|tstamp             |destination  |
+-------+--------+-------------------+-------------+
|1999997|H       |2020-07-21 22:47:17|Philadelphia |
|1999998|I       |2020-07-21 22:47:17|Miami        |
|1999999|J       |2020-07-21 22:47:17|San Francisco|
|2000009|J       |2020-07-21 22:54:55|Syracuse     |
|2000008|I       |2020-07-21 22:54:55|Syracuse     |
|2000007|H       |2020-07-21 22:54:55|Syracuse     |
|2000006|G       |2020-07-21 22:54:55|Syracuse     |
|2000001|B       |2020-07-21 22:54:55|Syracuse     |
|2000000|A       |2020-07-21 22:54:55|Syracuse     |
|2000005|F       |2020-07-21 22:54:55|Syracuse     |
|2000004|E       |2020-07-21 22:54:55|Syracuse     |
|2000003|D       |2020-07-21 22:54:55|Syracuse     |
|2000002|C       |2020-07-21 22:54:55|Syracuse     |
+-------+--------+-------------------+-------------+

We can observe that the records are updated.

## Deleting Records.

Apache Hudi supports implementing two types of deletes on data stored in Hudi datasets, by enabling the user to specify a different record payload implementation.

* **Soft Deletes** : With soft deletes, user wants to retain the key but just null out the values for all other fields. This can be simply achieved by ensuring the appropriate fields are nullable in the dataset schema and simply upserting the dataset after setting these fields to null.
    
* **Hard Deletes** : A stronger form of delete is to physically remove any trace of the record from the dataset. 

Let's now execute some hard delete operations on our dataset which will remove the records from our dataset.

Let's delete the 10 records with the "Syracuse" destination we added to the table. Note that the only change is the single line that set the hoodie.datasource.write.payload.class to org.apache.hudi.EmptyHoodieRecordPayload to delete the records.

```
.option(PAYLOAD_CLASS_OPT_KEY, EMPTY_PAYLOAD_CLASS_OPT_VAL)
```

In [25]:
df5.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------+-------+-------------------+
|destination|route_id|trip_id|             tstamp|
+-----------+--------+-------+-------------------+
|   Syracuse|       A|2000000|2020-07-21 22:54:55|
|   Syracuse|       B|2000001|2020-07-21 22:54:55|
|   Syracuse|       C|2000002|2020-07-21 22:54:55|
|   Syracuse|       D|2000003|2020-07-21 22:54:55|
|   Syracuse|       E|2000004|2020-07-21 22:54:55|
|   Syracuse|       F|2000005|2020-07-21 22:54:55|
|   Syracuse|       G|2000006|2020-07-21 22:54:55|
|   Syracuse|       H|2000007|2020-07-21 22:54:55|
|   Syracuse|       I|2000008|2020-07-21 22:54:55|
|   Syracuse|       J|2000009|2020-07-21 22:54:55|
+-----------+--------+-------+-------------------+

In [26]:
(df5.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 20)
      .option(S3_CONSISTENCY_CHECK, "true")
      .option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
      .option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
      .option(PAYLOAD_CLASS_OPT_KEY, EMPTY_PAYLOAD_CLASS_OPT_VAL)
      .mode("Append")
      .save(config['target']))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
spark.sql("select trip_id, route_id, tstamp, destination from "+config['table_name'] +" where trip_id > 1999996").show(20,False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-------------------+-------------+
|trip_id|route_id|tstamp             |destination  |
+-------+--------+-------------------+-------------+
|1999997|H       |2020-07-21 22:47:17|Philadelphia |
|1999998|I       |2020-07-21 22:47:17|Miami        |
|1999999|J       |2020-07-21 22:47:17|San Francisco|
+-------+--------+-------------------+-------------+

We can observe that the records > 2000000 no longer exist in our table.

Let's observe the number of files in S3. Expected : 6 files (initial files (3) + one upsert + one insert + one delete = 6)

```

$ aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_trips_table/ --summarize --human-readable
                           PRE .hoodie/
2020-04-28 23:11:39    0 Bytes .hoodie_$folder$
2020-04-28 23:11:52   93 Bytes .hoodie_partition_metadata
2020-04-28 23:15:43    4.8 MiB 0eeafaf2-0110-4056-b509-724b11a4c0b6-0_0-51-609_20200428231528.parquet
2020-04-28 23:11:59    4.8 MiB 0eeafaf2-0110-4056-b509-724b11a4c0b6-0_0-7-67_20200428231141.parquet
2020-04-28 23:18:24    4.4 MiB 1ce9cbac-56e4-477e-a2bc-33fe62b3550f-0_0-135-1376_20200428231814.parquet
2020-04-28 23:17:17    4.4 MiB 1ce9cbac-56e4-477e-a2bc-33fe62b3550f-0_0-93-1007_20200428231705.parquet
2020-04-28 23:11:59    4.4 MiB 1ce9cbac-56e4-477e-a2bc-33fe62b3550f-0_1-7-68_20200428231141.parquet
2020-04-28 23:11:59    4.6 MiB 579443dc-8f56-4990-bed6-a527f21e9682-0_2-7-69_20200428231141.parquet

Total Objects: 8
   Total Size: 27.5 MiB

```

In our example, we set number of commits to retain as 10. So, maximum only 10 new files can be created on top of our bulk insert files. i.e., 13 files in total. If we had set the commits_to_retain as 2, the number of files created will not increase beyond initial files(3) + commits_to_retain(2) = 5 files. This is because Hudi Cleaning Policy will delete older files when writing based on the commit retain policy.

In [28]:
%%local
!aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_trips_table/ --summarize --human-readable

                           PRE .hoodie/
2020-07-21 22:48:02    0 Bytes .hoodie_$folder$
2020-07-21 22:48:22   93 Bytes .hoodie_partition_metadata
2020-07-21 22:58:22    5.0 MiB 4b61fb70-cde1-488b-abc7-c1c2f85c0092-0_0-131-702_20200721225811.parquet
2020-07-21 22:57:11    5.0 MiB 4b61fb70-cde1-488b-abc7-c1c2f85c0092-0_0-91-482_20200721225701.parquet
2020-07-21 22:48:32    5.0 MiB 4b61fb70-cde1-488b-abc7-c1c2f85c0092-0_1-8-12_20200721224758.parquet
2020-07-21 22:53:03    4.3 MiB 7790f62b-531e-47cb-8b0f-9eb826e0f7fb-0_0-51-260_20200721225251.parquet
2020-07-21 22:48:31    4.3 MiB 7790f62b-531e-47cb-8b0f-9eb826e0f7fb-0_0-8-11_20200721224758.parquet
2020-07-21 22:48:35    4.5 MiB 7b0c13f2-6017-4c92-981d-5675559dbc97-0_2-8-13_20200721224758.parquet

Total Objects: 8
   Total Size: 28.2 MiB


## Rollback

Let's say we want to roll back the last delete we made. 

/usr/lib/hudi/cli/bin/hudi-cli.sh

connect --path s3://[YOUR-S3-BUCKET]/demos/hudi/hudi_trips_table/

commits show

hudi:hudi_trips_table->commits show
20/04/28 23:46:09 INFO s3n.S3NativeFileSystem: Opening 's3://<Your S3 Bucket Here>/tmp/hudi/hudi_trips_table/.hoodie/20200428231705.commit' for reading
20/04/28 23:46:09 INFO s3n.S3NativeFileSystem: Opening 's3://<Your S3 Bucket Here>/tmp/hudi/hudi_trips_table/.hoodie/20200428231528.commit' for reading
20/04/28 23:46:09 INFO s3n.S3NativeFileSystem: Opening 's3://<Your S3 Bucket Here>/tmp/hudi/hudi_trips_table/.hoodie/20200428231141.commit' for reading
╔════════════════╤═════════════════════╤═══════════════════╤═════════════════════╤══════════════════════════╤═══════════════════════╤══════════════════════════════╤══════════════╗
║ CommitTime     │ Total Bytes Written │ Total Files Added │ Total Files Updated │ Total Partitions Written │ Total Records Written │ Total Update Records Written │ Total Errors ║
╠════════════════╪═════════════════════╪═══════════════════╪═════════════════════╪══════════════════════════╪═══════════════════════╪══════════════════════════════╪══════════════╣
║ 20200428231814 │ 4.4 MB              │ 0                 │ 1                   │ 1                        │ 642132                │ 0                            │ 0            ║
╟────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20200428231528 │ 4.8 MB              │ 0                 │ 1                   │ 1                        │ 697329                │ 10                           │ 0            ║
╟────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20200428231528 │ 4.8 MB              │ 0                 │ 1                   │ 1                        │ 697329                │ 10                           │ 0            ║
╟────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢

║ 20200428231141 │ 13.8 MB             │ 3                 │ 0                   │ 1                        │ 2000000               │ 0                            │ 0            ║
╚════════════════╧═════════════════════╧═══════════════════╧═════════════════════╧══════════════════════════╧═══════════════════════╧══════════════════════════════╧══════════════╝

commit rollback --commit 20200428231814
    
Now let us check what happened to the records we deleted earlier.

In [29]:
spark.sql("select trip_id, route_id, tstamp, destination from "+config['table_name'] +" where trip_id > 1999996").show(20,False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-------------------+-------------+
|trip_id|route_id|tstamp             |destination  |
+-------+--------+-------------------+-------------+
|1999997|H       |2020-07-21 22:47:17|Philadelphia |
|1999998|I       |2020-07-21 22:47:17|Miami        |
|1999999|J       |2020-07-21 22:47:17|San Francisco|
|2000009|J       |2020-07-21 22:54:55|Syracuse     |
|2000008|I       |2020-07-21 22:54:55|Syracuse     |
|2000007|H       |2020-07-21 22:54:55|Syracuse     |
|2000006|G       |2020-07-21 22:54:55|Syracuse     |
|2000001|B       |2020-07-21 22:54:55|Syracuse     |
|2000000|A       |2020-07-21 22:54:55|Syracuse     |
|2000005|F       |2020-07-21 22:54:55|Syracuse     |
|2000004|E       |2020-07-21 22:54:55|Syracuse     |
|2000003|D       |2020-07-21 22:54:55|Syracuse     |
|2000002|C       |2020-07-21 22:54:55|Syracuse     |
+-------+--------+-------------------+-------------+

### Time Travel with Hudi 

Now, let us time travel with Hudi (query previous commits) with incremental and point-in-time queries

First, lets check out incremental queries

In EMR cluster, let us check Hudi CLI and check how commits look right now 

/usr/lib/hudi/cli/bin/hudi-cli.sh

connect --path s3://[YOUR-S3-BUCKET]/demos/hudi/hudi_trips_table/

commits show

In [30]:
spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_table order by commitTime").show(20, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+
|commitTime    |
+--------------+
|20200721224758|
|20200721225251|
|20200721225701|
+--------------+

In [31]:
commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_table order by commitTime").collect()
print("commits: ")
for elem in commits: print (elem) 

beginTime = commits[-3][0] # commit time we are interested in
print("begin time: " + beginTime)

# incrementally query data
incViewDF = spark.read.format("org.apache.hudi") \
                   .option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL) \
                   .option(BEGIN_INSTANTTIME_OPT_KEY, beginTime) \
                   .load(config["target"]) 

incViewDF.show(5)
incViewDF.registerTempTable("hudi_incr_table")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

commits: 
Row(commitTime='20200721224758')
Row(commitTime='20200721225251')
Row(commitTime='20200721225701')
begin time: 20200721224758
+-------------------+--------------------+------------------+----------------------+--------------------+-----------+--------+-------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|destination|route_id|trip_id|             tstamp|
+-------------------+--------------------+------------------+----------------------+--------------------+-----------+--------+-------+-------------------+
|     20200721225701|20200721225701_0_...|           2000009|                      |4b61fb70-cde1-488...|   Syracuse|       J|2000009|2020-07-21 22:54:55|
|     20200721225701|20200721225701_0_...|           2000008|                      |4b61fb70-cde1-488...|   Syracuse|       I|2000008|2020-07-21 22:54:55|
|     20200721225701|20200721225701_0_...|           2000007|                      |4b61f

In [32]:
spark.sql("select `_hoodie_commit_time`, trip_id, route_id, destination, tstamp from  hudi_incr_table where trip_id between 999996 and 1000013").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-------+--------+-----------+-------------------+
|_hoodie_commit_time|trip_id|route_id|destination|             tstamp|
+-------------------+-------+--------+-----------+-------------------+
|     20200721225251|1000000|       A|     Boston|2020-07-21 22:52:38|
|     20200721225251|1000001|       B|     Boston|2020-07-21 22:52:38|
|     20200721225251|1000002|       C|     Boston|2020-07-21 22:52:38|
|     20200721225251|1000003|       D|     Boston|2020-07-21 22:52:38|
|     20200721225251|1000004|       E|     Boston|2020-07-21 22:52:38|
|     20200721225251|1000005|       F|     Boston|2020-07-21 22:52:38|
|     20200721225251|1000006|       G|     Boston|2020-07-21 22:52:38|
|     20200721225251|1000007|       H|     Boston|2020-07-21 22:52:38|
|     20200721225251|1000008|       I|     Boston|2020-07-21 22:52:38|
|     20200721225251|1000009|       J|     Boston|2020-07-21 22:52:38|
+-------------------+-------+--------+-----------+-------------------+

Now, lets check out point-in-time queries. i.e., query from a specific commit 

In [33]:
commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_table order by commitTime").collect()
print("commits: ")
for elem in commits: print (elem) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

commits: 
Row(commitTime='20200721224758')
Row(commitTime='20200721225251')
Row(commitTime='20200721225701')

In [34]:
#Lets check out the first state. i.e., before we updated the trip to Boston

beginTime = "000" #Represents all commits > this time.
endTime = commits[-3][0] # first commit
print("end time: "+endTime)

# incrementally query data
incViewDF = spark.read.format("org.apache.hudi") \
                   .option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL) \
                   .option(BEGIN_INSTANTTIME_OPT_KEY, beginTime) \
                   .option(END_INSTANTTIME_OPT_KEY, endTime) \
                   .load(config["target"]) 

incViewDF.show(5)
incViewDF.registerTempTable("hudi_incr_table")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

end time: 20200721224758
+-------------------+--------------------+------------------+----------------------+--------------------+-------------+--------+-------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|  destination|route_id|trip_id|             tstamp|
+-------------------+--------------------+------------------+----------------------+--------------------+-------------+--------+-------+-------------------+
|     20200721224758|  20200721224758_1_1|           1552877|                      |4b61fb70-cde1-488...| Philadelphia|       H|1552877|2020-07-21 22:47:17|
|     20200721224758|  20200721224758_1_2|           1552878|                      |4b61fb70-cde1-488...|        Miami|       I|1552878|2020-07-21 22:47:17|
|     20200721224758|  20200721224758_1_3|           1552879|                      |4b61fb70-cde1-488...|San Francisco|       J|1552879|2020-07-21 22:47:17|
|     20200721224758|  2020072122

In [35]:
spark.sql("select `_hoodie_commit_time`, trip_id, route_id, destination, tstamp from  hudi_incr_table where trip_id between 999996 and 1000013").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-------+--------+-------------+-------------------+
|_hoodie_commit_time|trip_id|route_id|  destination|             tstamp|
+-------------------+-------+--------+-------------+-------------------+
|     20200721224758| 999996|       G|Washington DC|2020-07-21 22:47:17|
|     20200721224758| 999997|       H| Philadelphia|2020-07-21 22:47:17|
|     20200721224758| 999998|       I|        Miami|2020-07-21 22:47:17|
|     20200721224758| 999999|       J|San Francisco|2020-07-21 22:47:17|
|     20200721224758|1000000|       A|      Seattle|2020-07-21 22:47:17|
|     20200721224758|1000001|       B|     New York|2020-07-21 22:47:17|
|     20200721224758|1000002|       C|   New Jersey|2020-07-21 22:47:17|
|     20200721224758|1000003|       D|  Los Angeles|2020-07-21 22:47:17|
|     20200721224758|1000004|       E|    Las Vegas|2020-07-21 22:47:17|
|     20200721224758|1000005|       F|       Tucson|2020-07-21 22:47:17|
|     20200721224758|1000006|       G|Washington DC

Now we can see that the records from 1000000 to 1000009 display first state of our table before we upserted the trip destination to Boston

### Merge On Read 

The default table type is Copy-On-Write which is best suited for read-heavy workloads with modest writes. Copy-On-Write creates commit files with original data + the new changes during writing itself. While this increases latency on writes, this set up makes it more manageable for faster read.

For near real-time applications that mandate quick upserts, MERGE_ON_READ table type would be better suited. MOR table stores incoming upserts for each file group, onto a row based delta log (In Avro file format). This log is then merged with the existing Parquet file using a a compactor during reads.

In [36]:
## CHANGE ME ##
config = {
    "table_name": "hudi_mor_trips_table",
    "table_name_rt": "hudi_mor_trips_table_rt",
    "target": "s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_mor_trips_table",
    "primary_key": "trip_id",
    "sort_key": "tstamp",
    "commits_to_retain": "2",
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
STORAGE_TYPE_OPT_KEY="hoodie.datasource.write.storage.type"
COMPACTION_INLINE_OPT_KEY="hoodie.compact.inline"
COMPACTION_MAX_DELTA_COMMITS_OPT_KEY="hoodie.compact.inline.max.delta.commits"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
mor_dest = ["Seattle", "New York", "New Jersey", "Los Angeles", "Las Vegas", "Tucson","Washington DC","Philadelphia","Miami","San Francisco"]
df2 = create_json_df(spark, get_json_data(0, 2000000, mor_dest))
df2.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+--------+-------+-------------------+
|  destination|route_id|trip_id|             tstamp|
+-------------+--------+-------+-------------------+
|      Seattle|       A|      0|2020-07-21 23:12:05|
|     New York|       B|      1|2020-07-21 23:12:05|
|   New Jersey|       C|      2|2020-07-21 23:12:05|
|  Los Angeles|       D|      3|2020-07-21 23:12:05|
|    Las Vegas|       E|      4|2020-07-21 23:12:05|
|       Tucson|       F|      5|2020-07-21 23:12:05|
|Washington DC|       G|      6|2020-07-21 23:12:05|
| Philadelphia|       H|      7|2020-07-21 23:12:05|
|        Miami|       I|      8|2020-07-21 23:12:05|
|San Francisco|       J|      9|2020-07-21 23:12:05|
|      Seattle|       A|     10|2020-07-21 23:12:05|
|     New York|       B|     11|2020-07-21 23:12:05|
|   New Jersey|       C|     12|2020-07-21 23:12:05|
|  Los Angeles|       D|     13|2020-07-21 23:12:05|
|    Las Vegas|       E|     14|2020-07-21 23:12:05|
|       Tucson|       F|     15|2020-07-21 23:

Bulk insert will take the same time as COW as this is the first write 

In [39]:
(df2.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
      .option(BULK_INSERT_PARALLELISM, 3)
      .option(S3_CONSISTENCY_CHECK, "true")
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
      .option(STORAGE_TYPE_OPT_KEY, "MERGE_ON_READ")
      .option(COMPACTION_INLINE_OPT_KEY, "false")
      .option(COMPACTION_MAX_DELTA_COMMITS_OPT_KEY, "20")
      .mode("Overwrite")
      .save(config['target']))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Verify the number of files 

Let us check the contents of S3 path. Bulk insert operation on Copy-On-Write and Merge-On-Read tables is identical in terms of performance. 

```
$ aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_mor_trips_table/
                           PRE .hoodie/
2020-04-28 23:30:20          0 .hoodie_$folder$
2020-04-28 23:30:26         93 .hoodie_partition_metadata
2020-04-28 23:30:33    4378000 45b1ce07-f9ac-496d-8b03-20af011a0c44-0_1-194-3566_20200428233020.parquet
2020-04-28 23:30:34    5048941 932d5e97-c5f0-4c91-a7f6-f65d487a5e2b-0_2-194-3567_20200428233020.parquet
2020-04-28 23:30:34    5065824 ea6e8bfa-e70c-4f7e-90ec-37d018fb0acf-0_0-194-3565_20200428233020.parquet
```

Notice the delta commits 

```
$ aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_mor_trips_table/.hoodie/
2020-04-28 23:30:21          0 .aux_$folder$
2020-04-28 23:30:21          0 .temp_$folder$
2020-04-28 23:30:37       1077 20200428233020.clean
2020-04-28 23:30:36       4929 20200428233020.deltacommit
2020-04-28 23:30:21          0 archived_$folder$
2020-04-28 23:30:21        264 hoodie.properties
```

This is the first commit 

In [40]:
%%local
!aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_mor_trips_table/

                           PRE .hoodie/
2020-07-21 23:12:24          0 .hoodie_$folder$
2020-07-21 23:12:43         93 .hoodie_partition_metadata
2020-07-21 23:12:49    3585873 45e39d48-6ed2-45cb-81b8-ffb8595a80c4-0_1-181-852_20200721231223.parquet
2020-07-21 23:12:57    5628093 57143caf-e2ec-4989-962d-9137fb840b75-0_2-181-853_20200721231223.parquet
2020-07-21 23:12:52    5264997 9ac08b59-9f77-4ac6-bc91-38eeb193a864-0_0-181-851_20200721231223.parquet


In [41]:
%%local
!aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_mor_trips_table/.hoodie/

2020-07-21 23:12:25          0 .aux_$folder$
2020-07-21 23:12:25          0 .temp_$folder$
2020-07-21 23:12:59       1130 20200721231223.clean
2020-07-21 23:12:59        835 20200721231223.clean.inflight
2020-07-21 23:12:59        835 20200721231223.clean.requested
2020-07-21 23:12:58       4202 20200721231223.deltacommit
2020-07-21 23:12:33          0 20200721231223.deltacommit.inflight
2020-07-21 23:12:27          0 20200721231223.deltacommit.requested
2020-07-21 23:12:24          0 archived_$folder$
2020-07-21 23:12:25        310 hoodie.properties


Now let us try to upsert some records into MOR table

In [42]:
upsert_dest = ["San Diego", "San Diego", "San Diego", "San Diego", "San Diego","San Diego","San Diego","San Diego","San Diego","San Diego"]
df3 = create_json_df(spark, get_json_data(1000000, 10, upsert_dest))
df3.count()
df3.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------+-------+-------------------+
|destination|route_id|trip_id|             tstamp|
+-----------+--------+-------+-------------------+
|  San Diego|       A|1000000|2020-07-21 23:14:32|
|  San Diego|       B|1000001|2020-07-21 23:14:32|
|  San Diego|       C|1000002|2020-07-21 23:14:32|
|  San Diego|       D|1000003|2020-07-21 23:14:32|
|  San Diego|       E|1000004|2020-07-21 23:14:32|
|  San Diego|       F|1000005|2020-07-21 23:14:32|
|  San Diego|       G|1000006|2020-07-21 23:14:32|
|  San Diego|       H|1000007|2020-07-21 23:14:32|
|  San Diego|       I|1000008|2020-07-21 23:14:32|
|  San Diego|       J|1000009|2020-07-21 23:14:32|
+-----------+--------+-------+-------------------+

In [43]:
(df3.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 20)
      .option(S3_CONSISTENCY_CHECK, "true")
      .option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
      .option(HUDI_COMMITS_RETAINED,config["commits_to_retain"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
      .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)  
      .option(STORAGE_TYPE_OPT_KEY, "MERGE_ON_READ")
      .option(COMPACTION_INLINE_OPT_KEY, "false")
      .option(COMPACTION_MAX_DELTA_COMMITS_OPT_KEY, "20")
      .mode("Append")
      .save(config['target']))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
spark.sql("select trip_id, route_id, destination, tstamp from "+config['table_name']+ "_ro where trip_id between 999996 and 1000010").show(20,False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-------------+-------------------+
|trip_id|route_id|destination  |tstamp             |
+-------+--------+-------------+-------------------+
|1000000|A       |Seattle      |2020-07-21 23:12:05|
|1000001|B       |New York     |2020-07-21 23:12:05|
|1000002|C       |New Jersey   |2020-07-21 23:12:05|
|1000003|D       |Los Angeles  |2020-07-21 23:12:05|
|1000004|E       |Las Vegas    |2020-07-21 23:12:05|
|1000005|F       |Tucson       |2020-07-21 23:12:05|
|1000006|G       |Washington DC|2020-07-21 23:12:05|
|1000007|H       |Philadelphia |2020-07-21 23:12:05|
|1000008|I       |Miami        |2020-07-21 23:12:05|
|1000009|J       |San Francisco|2020-07-21 23:12:05|
|1000010|A       |Seattle      |2020-07-21 23:12:05|
|999996 |G       |Washington DC|2020-07-21 23:12:05|
|999997 |H       |Philadelphia |2020-07-21 23:12:05|
|999998 |I       |Miami        |2020-07-21 23:12:05|
|999999 |J       |San Francisco|2020-07-21 23:12:05|
+-------+--------+-------------+--------------

Lets query the real-time table

In [45]:
spark.sql("select trip_id, route_id, destination, tstamp from "+ config['table_name_rt'] + " where trip_id between 999996 and 1000010").show(20,False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-------------+-------------------+
|trip_id|route_id|destination  |tstamp             |
+-------+--------+-------------+-------------------+
|1000000|A       |San Diego    |2020-07-21 23:14:32|
|1000001|B       |San Diego    |2020-07-21 23:14:32|
|1000002|C       |San Diego    |2020-07-21 23:14:32|
|1000003|D       |San Diego    |2020-07-21 23:14:32|
|1000004|E       |San Diego    |2020-07-21 23:14:32|
|1000005|F       |San Diego    |2020-07-21 23:14:32|
|1000006|G       |San Diego    |2020-07-21 23:14:32|
|1000007|H       |San Diego    |2020-07-21 23:14:32|
|1000008|I       |San Diego    |2020-07-21 23:14:32|
|1000009|J       |San Diego    |2020-07-21 23:14:32|
|1000010|A       |Seattle      |2020-07-21 23:12:05|
|999996 |G       |Washington DC|2020-07-21 23:12:05|
|999997 |H       |Philadelphia |2020-07-21 23:12:05|
|999998 |I       |Miami        |2020-07-21 23:12:05|
|999999 |J       |San Francisco|2020-07-21 23:12:05|
+-------+--------+-------------+--------------

Check the S3 path again. There is no change in number of Parquet files after upsert operation unlike Copy-On-Write tables. 

```
$ aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_mor_trips_table/
                           PRE .hoodie/
2020-04-28 23:33:22       2071 .ea6e8bfa-e70c-4f7e-90ec-37d018fb0acf-0_20200428233020.log.1_0-227-3837
2020-04-28 23:30:20          0 .hoodie_$folder$
2020-04-28 23:30:26         93 .hoodie_partition_metadata
2020-04-28 23:30:33    4378000 45b1ce07-f9ac-496d-8b03-20af011a0c44-0_1-194-3566_20200428233020.parquet
2020-04-28 23:30:34    5048941 932d5e97-c5f0-4c91-a7f6-f65d487a5e2b-0_2-194-3567_20200428233020.parquet
2020-04-28 23:30:34    5065824 ea6e8bfa-e70c-4f7e-90ec-37d018fb0acf-0_0-194-3565_20200428233020.parquet

```

Now, let us compact using Hudi CLI 

On EMR:

```
/usr/lib/hudi/cli/bin/hudi-cli.sh

hudi->connect --path s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_mor_trips_table/

hudi:hudi_mor_trips_table->compactions show all
╔═════════════════════════╤═══════╤═══════════════════════════════╗
║ Compaction Instant Time │ State │ Total FileIds to be Compacted ║
╠═════════════════════════╧═══════╧═══════════════════════════════╣
║ (empty)                                                         ║
╚═════════════════════════════════════════════════════════════════╝

```

Notice there are no compactions for our mor table. Let us manually schedule and run a compaction. 

```

hudi:hudi_mor_trips_table->compaction schedule
Compaction successfully completed for 20200428233601


hudi:hudi_mor_trips_table->connect --path s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_mor_trips_table/

hudi:hudi_mor_trips_table->compactions show all
20/04/28 23:39:04 INFO timeline.HoodieActiveTimeline: Loaded instants java.util.stream.ReferencePipeline$Head@240d2f9d
20/04/28 23:39:04 INFO s3n.S3NativeFileSystem: Opening 's3://<YOUR-S3-BUCKET>/tmp/hudi/hudi_mor_trips_table/.hoodie/.aux/20200428233601.compaction.requested' for reading
╔═════════════════════════╤═══════════╤═══════════════════════════════╗
║ Compaction Instant Time │ State     │ Total FileIds to be Compacted ║
╠═════════════════════════╪═══════════╪═══════════════════════════════╣
║ 20200428233601          │ REQUESTED │ 1                             ║
╚═════════════════════════╧═══════════╧═══════════════════════════════╝

hudi:hudi_mor_trips_table->compaction run --parallelism 10 --sparkMemory 4G --retry 2 --schemaFilePath s3://<YOUR-S3-BUCKET>/demos/schema.avsc --compactionInstant 20200428233601

Compaction successfully completed for 20200428233601

```

You can also schedule a compaction using "compaction schedule" option. 

Now, if we check the S3 path, we will see the delta commit has been merged into a new file 

```
$ aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_mor_trips_table/
                           PRE .hoodie/
2020-04-28 23:33:22       2071 .ea6e8bfa-e70c-4f7e-90ec-37d018fb0acf-0_20200428233020.log.1_0-227-3837
2020-04-28 23:30:20          0 .hoodie_$folder$
2020-04-28 23:30:26         93 .hoodie_partition_metadata
2020-04-28 23:30:33    4378000 45b1ce07-f9ac-496d-8b03-20af011a0c44-0_1-194-3566_20200428233020.parquet
2020-04-28 23:30:34    5048941 932d5e97-c5f0-4c91-a7f6-f65d487a5e2b-0_2-194-3567_20200428233020.parquet
2020-04-28 23:41:48    5065205 ea6e8bfa-e70c-4f7e-90ec-37d018fb0acf-0_0-0-0_20200428233601.parquet
2020-04-28 23:30:34    5065824 ea6e8bfa-e70c-4f7e-90ec-37d018fb0acf-0_0-194-3565_20200428233020.parquet

```


In [46]:
%%local
!aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_mor_trips_table/

                           PRE .hoodie/
2020-07-21 23:14:47       2071 .9ac08b59-9f77-4ac6-bc91-38eeb193a864-0_20200721231223.log.1_0-214-1060
2020-07-21 23:12:24          0 .hoodie_$folder$
2020-07-21 23:12:43         93 .hoodie_partition_metadata
2020-07-21 23:12:49    3585873 45e39d48-6ed2-45cb-81b8-ffb8595a80c4-0_1-181-852_20200721231223.parquet
2020-07-21 23:12:57    5628093 57143caf-e2ec-4989-962d-9137fb840b75-0_2-181-853_20200721231223.parquet
2020-07-21 23:23:30    5264375 9ac08b59-9f77-4ac6-bc91-38eeb193a864-0_0-0-0_20200721231708.parquet
2020-07-21 23:12:52    5264997 9ac08b59-9f77-4ac6-bc91-38eeb193a864-0_0-181-851_20200721231223.parquet


## Working with Partitioned Tables

Let's do the same thing with Partitioned Tables. For the sake of this demo, we will be making route_id as partition field. You can also have a nested partition structure like yyyy/mm/dd which is more common

In [47]:
## CHANGE ME ##
config = {
    "table_name": "hudi_partitioned_trips_table",
    "target": "s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_partitioned_trips_table",
    "primary_key": "trip_id",
    "sort_key": "tstamp",
    "commits_to_retain": "2",
    "partition_keys" : "route_id"
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Let's generate the data:

In [48]:
part_dest = ["Seattle", "New York", "New Jersey", "Los Angeles", "Las Vegas", "Tucson","Washington DC","Philadelphia","Miami","San Francisco"]
df1 = create_json_df(spark, get_json_data(0, 2000000, part_dest))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We add the partitionKey column to the dataframe.

In [49]:
from pyspark.sql.functions import concat, col, lit

hudiTablePartitionKey="route_id"
df1 = df1.withColumn(hudiTablePartitionKey,concat(lit("route_id="),col("route_id")))
df1.select(hudiTablePartitionKey).show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
|  route_id|
+----------+
|route_id=A|
|route_id=B|
|route_id=C|
|route_id=D|
|route_id=E|
+----------+
only showing top 5 rows

And we can now write out the data to S3. Notice that the Hive Partition Extractor class has changed in the statement below:

```
      .option(HIVE_PARTITION_FIELDS_OPT_KEY, config["partition_keys"])
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL)
      .option(PARTITIONPATH_FIELD_OPT_KEY,"route_id")
```


In [50]:
(df1.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
      .option(BULK_INSERT_PARALLELISM, 6)
      .option(S3_CONSISTENCY_CHECK, "true")
      .option(HIVE_PARTITION_FIELDS_OPT_KEY, config["partition_keys"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL)
      .option(PARTITIONPATH_FIELD_OPT_KEY,"route_id")
      .mode("Overwrite")
      .save(config['target']))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
spark.sql("show create table "+config['table_name']).show(20,False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|createtab_stmt                                                                                                                                                                                                             

We can see the partitions fields are present in our Hive table. 

```
PARTITIONED BY (`route_id` STRING)
```

Let's now query the data and group by the the partition columns:

In [52]:
spark.sql("select route_id, count(*) as num_trips from "+config['table_name']+" group by route_id order by route_id").show(20,False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---------+
|route_id|num_trips|
+--------+---------+
|A       |200000   |
|B       |200000   |
|C       |200000   |
|D       |200000   |
|E       |200000   |
|F       |200000   |
|G       |200000   |
|H       |200000   |
|I       |200000   |
|J       |200000   |
+--------+---------+

Let us check the S3 path

```
$ aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_partitioned_trips_table/
                           PRE .hoodie/
                           PRE route_id=A/
                           PRE route_id=B/
                           PRE route_id=C/
                           PRE route_id=D/
                           PRE route_id=E/
                           PRE route_id=F/
                           PRE route_id=G/
                           PRE route_id=H/
                           PRE route_id=I/
                           PRE route_id=J/
2020-04-28 23:42:50          0 .hoodie_$folder$
2020-04-28 23:42:55          0 route_id=A_$folder$
2020-04-28 23:42:55          0 route_id=B_$folder$
2020-04-28 23:42:57          0 route_id=C_$folder$
2020-04-28 23:42:55          0 route_id=D_$folder$
2020-04-28 23:42:57          0 route_id=E_$folder$
2020-04-28 23:42:55          0 route_id=F_$folder$
2020-04-28 23:42:55          0 route_id=G_$folder$
2020-04-28 23:42:57          0 route_id=H_$folder$
2020-04-28 23:42:55          0 route_id=I_$folder$
2020-04-28 23:42:58          0 route_id=J_$folder$

```

Under each partition, there will be partition metadata

```

$ aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_partitioned_trips_table/route_id=A/
2020-04-28 23:42:56         93 .hoodie_partition_metadata
2020-04-28 23:42:59    1723564 447b86e6-500c-463a-bdac-74abb867efad-0_0-256-4156_20200428234249.parquet

```

In [53]:
%%local
!aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_partitioned_trips_table/

                           PRE .hoodie/
                           PRE route_id=A/
                           PRE route_id=B/
                           PRE route_id=C/
                           PRE route_id=D/
                           PRE route_id=E/
                           PRE route_id=F/
                           PRE route_id=G/
                           PRE route_id=H/
                           PRE route_id=I/
                           PRE route_id=J/
2020-07-21 23:26:48          0 .hoodie_$folder$
2020-07-21 23:27:06          0 route_id=A_$folder$
2020-07-21 23:27:06          0 route_id=B_$folder$
2020-07-21 23:27:09          0 route_id=C_$folder$
2020-07-21 23:27:14          0 route_id=D_$folder$
2020-07-21 23:27:16          0 route_id=E_$folder$
2020-07-21 23:27:19          0 route_id=F_$folder$
2020-07-21 23:27:17          0 route_id=G_$folder$
2020-07-21 23:27:20          0 route_id=H_$folder$
2020-07-21 23:27:17          0 route_id=I_$folder$
20

In [55]:
%%local
!aws s3 ls s3://<YOUR-S3-BUCKET>/demos/hudi/hudi_partitioned_trips_table/route_id=A/

2020-07-21 23:27:07         93 .hoodie_partition_metadata
2020-07-21 23:27:12    1723514 fa19c764-2b15-44a1-a8ab-e606507c3792-0_0-237-1107_20200721232647.parquet


The other operations Upsert etc. behave the same way on Partitioned tables.