# Hudi Quick Start with Spark

This Jupyter notebook is based on the Quickstart guide on the Apache Hudi official website. Though it is a helpful guide, there were some tweaks that had to be made in order for this notebook to run successfully. 

Source:
https://hudi.apache.org/docs/quick-start-guide

### Jupyter Notebook Configure 

This portion is to configure the EMR notebook to use Hudi.

In [1]:
%%configure
{ "conf": {
            "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
            "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
            "spark.sql.hive.convertMetastoreParquet":"false"
          }}

### Test Data Preparation

This portion is to prepare the test data. The test data is generated by `sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()`, which is provided on the Quickstart guide. 

Test Data Schema
```
begin_lat: double
begin_lon: double
driver: string
end_lat: double
end_lon: double
fare: double
partitionpath: string
rider: string
ts: long
uuid: string
```

Partition Column - The partitionpath column will be used as a partition column. Here are the sample partition values.
```
americas/united_states/san_francisco
americas/brazil/sao_paulo           
asia/india/chennai                  
```

Understanding the partitionpath values, we can assume that the S3 paths will look like this.
```
s3://bucket-name/americas/united_states/san_francisco/actualData.parquet
```


In [3]:
# Import necessary libraries
from pyspark.sql.functions import *

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# Define variables.
tableName = "hudi_trips_cow"
basePath = "s3://ecs-bi-dev-datalake-raw/hudi_trips_cow"

# basePathWithPartition 
# Each * represent each folder (aka. prefix in S3) in partition path. 
# The last * represents the actual parquet files that contain the data.
basePathWithPartition = basePath + "/*/*/*/*" 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Instatiate DataGenerator and generate 10 new records for inserts.
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Create a df based on the data generated above.
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- begin_lat: double (nullable = true)
 |-- begin_lon: double (nullable = true)
 |-- driver: string (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lon: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- partitionpath: string (nullable = true)
 |-- rider: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- uuid: string (nullable = true)

In [10]:
df.show(10, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-------------------+----------+-------------------+-------------------+------------------+------------------------------------+---------+-------------+------------------------------------+
|begin_lat          |begin_lon          |driver    |end_lat            |end_lon            |fare              |partitionpath                       |rider    |ts           |uuid                                |
+-------------------+-------------------+----------+-------------------+-------------------+------------------+------------------------------------+---------+-------------+------------------------------------+
|0.4726905879569653 |0.46157858450465483|driver-213|0.754803407008858  |0.9671159942018241 |34.158284716382845|americas/brazil/sao_paulo           |rider-213|1640394420617|add08df1-927e-4268-ba67-79f23e7309f1|
|0.6100070562136587 |0.8779402295427752 |driver-213|0.3407870505929602 |0.5030798142293655 |43.4923811219014  |americas/brazil/sao_paulo           |rider-213|16

In [11]:
# List the distinct partition paths.
df.select('partitionpath').distinct().collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(partitionpath='americas/united_states/san_francisco'), Row(partitionpath='asia/india/chennai'), Row(partitionpath='americas/brazil/sao_paulo')]

In [13]:
# Specify common DataSourceWriteOptions in the single hudi_options variable.
hudi_options = {
    'hoodie.table.name': tableName,
    'hoodie.datasource.write.recordkey.field': 'uuid',
    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
    'hoodie.datasource.write.table.name': tableName,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'ts',
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2
}

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Write the data into S3.
df.write.format("hudi"). \
    options(**hudi_options). \
    mode("overwrite"). \ # mode(Overwrite) overwrites and recreates the table if it already exists. 
    save(basePath)

### Query Data with Redshift Spectrum
This is the SQL script that can be run in Redshift to create an exteral table and its partitions for the test dataset. The result of the last SELECT statement should be matching to the initial test data set. 

```
DROP TABLE IF EXISTS datalake.hudi_trips_cow;

CREATE EXTERNAL TABLE datalake.hudi_trips_cow 
    (
        begin_lat       Double Precision,
        begin_lon       Double Precision,   
        driver          varchar(64),   
        end_lat         Double Precision,   
        end_lon         Double Precision,     
        fare            Double Precision,      
        rider           varchar(256),  
        ts              timestamp,  
        uuid            varchar(256)
    )
PARTITIONED BY(partitionpath varchar(256))
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 's3://ecs-bi-dev-datalake-raw/hudi_trips_cow';


ALTER TABLE datalake.hudi_trips_cow
ADD IF NOT EXISTS PARTITION(partitionpath = 'americas/brazil/sao_paulo')
LOCATION 's3://ecs-bi-dev-datalake-raw/hudi_trips_cow/americas/brazil/sao_paulo';

ALTER TABLE datalake.hudi_trips_cow
ADD IF NOT EXISTS PARTITION(partitionpath = 'americas/united_states/san_francisco')
LOCATION 's3://ecs-bi-dev-datalake-raw/hudi_trips_cow/americas/united_states/san_francisco';

ALTER TABLE datalake.hudi_trips_cow
ADD IF NOT EXISTS PARTITION(partitionpath = 'asia/india/chennai')
LOCATION 's3://ecs-bi-dev-datalake-raw/hudi_trips_cow/asia/india/chennai';

select *
from   datalake.hudi_trips_cow;
```

### Query Data in PySpark

Load the data files into a DataFrame.

In [14]:
tripsSnapshotDF = spark. \
  read. \
  format("hudi"). \
  load(basePathWithPartition)
# load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

In [None]:
spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show(20, False)

In [None]:
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show(20, False)

### Time Travel Query

Time Travel enables accessing historical data (i.e. data that has been changed or deleted) at any point within a defined period. It serves as a powerful tool for performing the following tasks:

- Restoring data-related objects (tables, schemas, and databases) that might have been accidentally or intentionally deleted.
- Duplicating and backing up data from key points in the past.
- Analyzing data usage/manipulation over specified periods of time.

Hudi support time travel query since 0.9.0. Currently three query time formats are supported as given below.

In [15]:
timeTravelDf = spark.read. \
  format("hudi"). \
  option("as.of.instant", "20210728141108"). \
  load(basePathWithPartition)

"""
spark.read. \
  format("hudi"). \
  option("as.of.instant", "2021-07-28 14: 11: 08"). \
  load(basePath)

# It is equal to "as.of.instant = 2021-07-28 00:00:00"
spark.read. \
  format("hudi"). \
  option("as.of.instant", "2021-07-28"). \
  load(basePath)
"""

timeTravelDf.show(20,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+------------------------------------+------------------------------------+----------------------------------------------------------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+------------------------------------+---------+-------------+------------------------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key                  |_hoodie_partition_path              |_hoodie_file_name                                                     |begin_lat          |begin_lon          |driver    |end_lat            |end_lon            |fare              |partitionpath                       |rider    |ts           |uuid                                |
+-------------------+--------------------+------------------------------------+------------------------------------+----------------------------------------------------------------------+-------------------+-

### Update Data

This is similar to inserting new data. Generate updates to existing trips using the data generator, load into a DataFrame and write DataFrame into the hudi table.


In [16]:
# Generate 5 update records using QuickstartUtils.
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(5))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- begin_lat: double (nullable = true)
 |-- begin_lon: double (nullable = true)
 |-- driver: string (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lon: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- partitionpath: string (nullable = true)
 |-- rider: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- uuid: string (nullable = true)

In [19]:
df.show(10, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+--------------------+----------+------------------+-------------------+------------------+------------------------------------+---------+-------------+------------------------------------+
|begin_lat          |begin_lon           |driver    |end_lat           |end_lon            |fare              |partitionpath                       |rider    |ts           |uuid                                |
+-------------------+--------------------+----------+------------------+-------------------+------------------+------------------------------------+---------+-------------+------------------------------------+
|0.7340133901254792 |0.5142184937933181  |driver-284|0.7814655558162802|0.6592596683641996 |49.527694252432056|americas/united_states/san_francisco|rider-284|1640773277931|93113d38-7a04-40c8-a468-e0534e7a74e7|
|0.1593867607188556 |0.010872312870502165|driver-284|0.9808530350038475|0.7963756520507014 |29.47661370147079 |americas/brazil/sao_paulo           |rider-284|16

In [25]:
# Notice here that the mode is append, not overwrite.
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("append"). \
  save(basePath)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Once the update is executed successfully, check the data set again either using the Redshift Spectrum table or using the PySpark method and see if the data gets updated correctly. Check based on the uuid column (record key). 

### Incremental Query

Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. We do not need to specify endTime, if we want all changes after the given commit (as is the common case). 

In [27]:
# reload data
spark. \
  read. \
  format("hudi"). \
  load(basePathWithPartition). \
  createOrReplaceTempView("hudi_trips_snapshot")

commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").limit(50).collect()))
beginTime = commits[len(commits) - 2] # commit time we are interested in

# incrementally query data
incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': beginTime,
}

tripsIncrementalDF = spark.read.format("hudi"). \
  options(**incremental_read_options). \
  load(basePathWithPartition)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+------------------+-------------------+-------------------+-------------+
|_hoodie_commit_time|              fare|          begin_lon|          begin_lat|           ts|
+-------------------+------------------+-------------------+-------------------+-------------+
|     20211230202633|49.527694252432056| 0.5142184937933181| 0.7340133901254792|1640773277931|
|     20211230202633| 63.72504913279929|  0.888493603696927| 0.6570857443423376|1640881999594|
|     20211230202633| 86.75932789048282|0.13755354862499358| 0.7180196467760873|1640829595043|
|     20211230202633| 90.25710109008239| 0.4006983139989222|0.08528650347654165|1640592065829|
+-------------------+------------------+-------------------+-------------------+-------------+

### Delete data

You can delete records as shown below. Also fyi - to hard delete a record, you can upsert an empty payload. 

In [36]:
# Before Delete Count
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

14

In [37]:
# Fetch two records to be deleted
ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
# Can also create a separate hudi config variable like this.
hudi_delete_options = {
  'hoodie.table.name': tableName,
  'hoodie.datasource.write.recordkey.field': 'uuid',
  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
  'hoodie.datasource.write.table.name': tableName,
  'hoodie.datasource.write.operation': 'delete',
  'hoodie.datasource.write.precombine.field': 'ts',
  'hoodie.upsert.shuffle.parallelism': 2, 
  'hoodie.insert.shuffle.parallelism': 2
}

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
df = spark.sparkContext.parallelize(deletes).toDF(['uuid', 'partitionpath']).withColumn('ts', lit(0.0))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
df.show(20,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------------------+------------------------------------+---+
|uuid                                |partitionpath                       |ts |
+------------------------------------+------------------------------------+---+
|2b6e0308-7ce6-458b-8d00-eaf96cc57bea|americas/united_states/san_francisco|0.0|
|5f5898b6-c6f1-4381-ab31-55ee8726c120|americas/united_states/san_francisco|0.0|
+------------------------------------+------------------------------------+---+

In [41]:
df.write.format("hudi"). \
  options(**hudi_delete_options). \
  mode("append"). \
  save(basePath)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
# run the same read query as above.
roAfterDeleteViewDF = spark. \
  read. \
  format("hudi"). \
  load(basePathWithPartition) 
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
# fetch should return (total - 2) records
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12