# Apache Hudi Synapse Quickstart
![Apache Hudi and Azure Synapse Analytics](https://raw.githubusercontent.com/kywe665/azuredemos/main/img/ApacheHudi%20and%20Azure.png)

This notebook provides a simple example of how you can use Apach Hudi with Azure Synapse Analytics.

For this notebook to work you need the Apache Hudi libraries loaded in your Synapse workspace and Spark Pool. 
See this blog post for a detailed walkthrough of the setup:

_**TODO Insert Blog Link**_

### Import Sample Dataset

In [71]:
#using NYC Taxi demo dataset from AML
from azureml.opendatasets import NycTlcYellow
from dateutil import parser
from datetime import datetime

end_date = parser.parse('2018-06-06')
start_date = parser.parse('2018-05-01')
nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
filtered_df = nyc_tlc.to_spark_dataframe()

display(filtered_df)

StatementMeta(kwhudi, 24, 1, Finished, Available)

SynapseWidget(Synapse.DataFrame, 0490de67-626e-4842-8cd2-0dc184fa0d5c)

## Set Hudi write configs
Choose a Hudi base path and set basic write configs. 

- Read about Hudi **record keys**, **precombine keys**, and other **configs** in the Hudi docs:
  - https://hudi.apache.org/docs/writing_data
- Read about Hudi **write operations** here:
  - https://hudi.apache.org/docs/write_operations

### Write the sample dataset to ADLS G2 as a Hudi table
All it needs is a single keyword swap: spark.write.format("**hudi**")

In [72]:
basePath = "abfs://kwadlsfilesyshudi@kwadlshudi.dfs.core.windows.net/hudi-test/"
tableName = "hudi_test_data"

hudi_options = {
    'hoodie.table.name': tableName,
    'hoodie.datasource.write.recordkey.field': 'tpepPickupDateTime',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'tpepDropoffDateTime'
}

filtered_df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)

StatementMeta(kwhudi, 24, 2, Finished, Available)

### You can create a managed or external Shared Table with Hudi keyword
https://docs.microsoft.com/en-us/azure/synapse-analytics/metadata/table#shared-spark-tables 

In [73]:
spark.sql("CREATE TABLE HudiTable USING HUDI LOCATION '{0}'".format(basePath))

StatementMeta(kwhudi, 24, 3, Finished, Available)

DataFrame[]

### You can now take full advantage of SQL on your Hudi tables

In [74]:
%%sql
select * from HudiTable

StatementMeta(kwhudi, 24, 4, Finished, Available)

<Spark SQL result set with 1000 rows and 28 fields>

## Apache Hudi Upserts, Merges
Apache Hudi offers a first of its kind [high performance indexing subsystem](https://www.onehouse.ai/blog/introducing-multi-modal-index-for-the-lakehouse-in-apache-hudi) on a data lake. With record level indexes and ACID transactions it is simple for Hudi to make fast and efficient upserts and merges.

Let's say that after a taxi trip is completed, a rider decides to change his tip hours or days after the ride has completed.

For this example let's grab a single record and inspect the original `tipAmount`

In [75]:
origvalue = spark. \
  read. \
  format("hudi"). \
  load(basePath). \
  where("_hoodie_record_key = '1526970901000000'")

display(origvalue.select("tipAmount"))

StatementMeta(kwhudi, 24, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, 7727daa8-a1bc-40a0-8eef-f46d0c472990)

## Perform Upsert

- Set the hoodie write operation to `upsert`
- Change the `tipAmount` to $`5.23`
- Write the updated value as an `append`

In [76]:
from pyspark.sql.functions import lit

hudi_options = {
    'hoodie.table.name': tableName,
    'hoodie.datasource.write.recordkey.field': 'tpepPickupDateTime',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'tpepDropoffDateTime'
}

updatevalue = origvalue.withColumn("tipAmount",lit(5.23))

updatevalue.write.format("hudi"). \
  options(**hudi_options). \
  mode("append"). \
  save(basePath)

StatementMeta(kwhudi, 24, 6, Finished, Available)

#### Check Result of Upsert
You can see that the original record was modified

In [78]:
testupdate = spark. \
  read. \
  format("hudi"). \
  load(basePath). \
  where("_hoodie_record_key = '1526970901000000'")

display(testupdate.select("tipAmount"))

StatementMeta(kwhudi, 24, 8, Finished, Available)

SynapseWidget(Synapse.DataFrame, d5bc0f39-76de-4d95-a414-77be73de745b)

## Apache Hudi - Time Travel

With Apache Hudi you can write time travel queries to reproduce what a dataset looked like at a point in time.
You can specify the point in time with a commit instant, or a timestamp: https://hudi.apache.org/docs/quick-start-guide#time-travel-query

There are several ways that you can find a commit instant (query table details, use Hudi CLI, or inspect storage). For simplicity, I opened the ADLS browser inside Synapse Studio and navigated to the folder where my data is saved.

In the folder is a **`.hoodie`** folder which has a list of commits represented as `[commit instance id].commit`. I picked the earliest one for this example.

In [80]:
#browse in ADLS path to find .hoodie folder and get a commit instant
testupdate = spark. \
  read. \
  format("hudi"). \
  option("as.of.instant", "20220624055125356"). \
  load(basePath). \
  where("_hoodie_record_key = '1526970901000000'")

display(testupdate.select("tipAmount"))
# you can travel back in time before the upsert and reproduce the original tipAmount

StatementMeta(kwhudi, 24, 10, Finished, Available)

SynapseWidget(Synapse.DataFrame, 633872f7-736c-4828-9f8e-c67121b6c745)

## Apache Hudi - Incremental Queries
Apache Hudi enables you to replace your old-school batch data pipelines with effecient incremental pipelines. 

I can specify an `incremental` query type and ask Hudi for all the records that are new or updated after a given commit or timestamp.

_(for this example it is just that one row we updated, but feel free to play around with it more)_

In [81]:
# incrementally query data
incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': 20220624055125356,
}

tripsIncrementalDF = spark.read.format("hudi"). \
  options(**incremental_read_options). \
  load(basePath)

display(tripsIncrementalDF)

StatementMeta(kwhudi, 24, 11, Finished, Available)

SynapseWidget(Synapse.DataFrame, 95417d8f-131c-4dd5-8e63-8c27de2ec8eb)

## Apache Hudi - DELETES
A challenging task for most data lakes is handling Deletes especially when dealing with GDPR compliance regulations.

Apache Hudi processes deletes fast and efficient while offering advanced concurrency control configurations: https://hudi.apache.org/blog/2021/12/16/lakehouse-concurrency-control-are-we-too-optimistic

An example of how to process a delete:
1. Query the records you want to delete
2. Set hoodie write operation to `delete`
3. Write those records as an append

In [None]:
todelete = spark. \
  read. \
  format("hudi"). \
  load(basePath). \
  where("_hoodie_record_key = '1527015638000000'")

display(todelete)

In [None]:
hudi_delete_options = {
    'hoodie.table.name': tableName,
    'hoodie.datasource.write.recordkey.field': 'tpepPickupDateTime',
    'hoodie.datasource.write.operation': 'delete',
    'hoodie.datasource.write.precombine.field': 'tpepDropoffDateTime'
}

todelete.write.format("hudi").options(**hudi_delete_options).mode("append").save(basePath)

### Confirm delete was successful

In [None]:
testdelete = spark. \
  read. \
  format("hudi"). \
  load(basePath). \
  where("_hoodie_record_key = '1527015638000000'")

display(testdelete)