In [1]:
%%configure -f

{ "conf": {"spark.jars.packages": "org.apache.hudi:hudi-spark3.1-bundle_2.12:0.12.2" }}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
8,application_1675270889187_0012,pyspark,idle,Link,Link,


In [2]:
# Create sample data
new_rows = [("234fr","CA",22, 45000,"2022-05-22"),("edf56","AK",35,65000,"2022-06-22") ,("001uj","WA",50,85000,"2022-07-22")]
demo_df = spark.createDataFrame(new_rows, ['id', 'state', 'age', 'salary','date'])
demo_df.show()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9,application_1675270889187_0013,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.
+-----+-----+---+------+----------+
|   id|state|age|salary|      date|
+-----+-----+---+------+----------+
|234fr|   CA| 22| 45000|2022-05-22|
|edf56|   AK| 35| 65000|2022-06-22|
|001uj|   WA| 50| 85000|2022-07-22|
+-----+-----+---+------+----------+

In [3]:
# Set Hudi write configs 
basePath = "abfs://default@stmurggu9clngm.dfs.core.windows.net/hudi-test/"
tableName = "hudi_test_data"
 
hudi_options = {
   'hoodie.table.name': tableName,
   'hoodie.datasource.write.recordkey.field': 'id',
   'hoodie.datasource.write.operation': 'upsert',
   'hoodie.datasource.write.precombine.field': 'date'
}

In [4]:
# Write the sample dataset to ADLS G2 as a Hudi table
demo_df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)

In [5]:
# Create a SQL Table
spark.sql("DROP TABLE IF EXISTS HudiDemoTable")
spark.sql("CREATE TABLE HudiDemoTable USING HUDI LOCATION '{0}'".format(basePath))

DataFrame[]

In [6]:
%%sql
select * from HudiDemoTable

VBox(children=(HBox(children=(HTML(value=u'Type:'), Button(description=u'Table', layout=Layout(width=u'70px'),…

Output()

In [7]:
# Upsets / Merges
origvalue = spark. \
 read. \
 format("hudi"). \
 load(basePath). \
 where("_hoodie_record_key = 'edf56'")
 
origvalue.select("state").show()

+-----+
|state|
+-----+
|   AK|
+-----+

In [8]:
from pyspark.sql.functions import lit
 
hudi_options = {
   'hoodie.table.name': tableName,
   'hoodie.datasource.write.recordkey.field': 'id',
   'hoodie.datasource.write.operation': 'upsert',
   'hoodie.datasource.write.precombine.field': 'date'
}
 
updatevalue = origvalue.withColumn("state", lit("WA"))
 
updatevalue.write.format("hudi"). \
 options(**hudi_options). \
 mode("append"). \
 save(basePath)

In [9]:
testupdate = spark. \
 read. \
 format("hudi"). \
 load(basePath). \
 where("_hoodie_record_key = 'edf56'")
 
testupdate.select("state").show()

+-----+
|state|
+-----+
|   WA|
+-----+

In [10]:
# Time travel
testupdate = spark. \
 read. \
 format("hudi"). \
 option("as.of.instant", "20230203132655256"). \
 load(basePath). \
 where("_hoodie_record_key = 'edf56'")
 
testupdate.select("state").show()

+-----+
|state|
+-----+
|   AK|
+-----+

In [11]:
# Incremental Queries
incremental_read_options = {
 'hoodie.datasource.query.type': 'incremental',
 'hoodie.datasource.read.begin.instanttime': 20230203132655256,
}
 
demo_df_incremental = spark.read.format("hudi"). \
 options(**incremental_read_options). \
 load(basePath)
 
demo_df_incremental.show()

+-------------------+--------------------+------------------+----------------------+--------------------+-----+-----+---+------+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|   id|state|age|salary|      date|
+-------------------+--------------------+------------------+----------------------+--------------------+-----+-----+---+------+----------+
|  20230203132829588|20230203132829588...|             edf56|                      |74bfd860-d712-4ca...|edf56|   WA| 35| 65000|2022-06-22|
+-------------------+--------------------+------------------+----------------------+--------------------+-----+-----+---+------+----------+

In [12]:
# Deletes
todelete = spark. \
 read. \
 format("hudi"). \
 load(basePath). \
 where("_hoodie_record_key = '234fr'")
 
todelete.show()

+-------------------+--------------------+------------------+----------------------+--------------------+-----+-----+---+------+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|   id|state|age|salary|      date|
+-------------------+--------------------+------------------+----------------------+--------------------+-----+-----+---+------+----------+
|  20230203132655256|20230203132655256...|             234fr|                      |74bfd860-d712-4ca...|234fr|   CA| 22| 45000|2022-05-22|
+-------------------+--------------------+------------------+----------------------+--------------------+-----+-----+---+------+----------+

In [13]:
hudi_delete_options = {
   'hoodie.table.name': tableName,
   'hoodie.datasource.write.recordkey.field': 'id',
   'hoodie.datasource.write.operation': 'delete',
   'hoodie.datasource.write.precombine.field': 'date'
}
 
todelete.write.format("hudi").options(**hudi_delete_options).mode("append").save(basePath)

Exception in thread Thread-42:
Traceback (most recent call last):
  File "/usr/bin/anaconda/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/usr/bin/anaconda/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/bin/anaconda/lib/python2.7/site-packages/sparkprogressindicator/sparkmonitorbackend.py", line 196, in _check_jobs
    self._send_msgs_for_fast_job(next_job)
  File "/usr/bin/anaconda/lib/python2.7/site-packages/sparkprogressindicator/sparkmonitorbackend.py", line 297, in _send_msgs_for_fast_job
    self._send_msgs_for_fast_stage(stage, job["jobId"])
  File "/usr/bin/anaconda/lib/python2.7/site-packages/sparkprogressindicator/sparkmonitorbackend.py", line 303, in _send_msgs_for_fast_stage
    self._send_msgs_for_fast_task(task, stage)
  File "/usr/bin/anaconda/lib/python2.7/site-packages/sparkprogressindicator/sparkmonitorbackend.py", line 308, in _send_msgs_for_fast_task
    self._send_task

In [14]:
todelete = spark. \
 read. \
 format("hudi"). \
 load(basePath). \
 where("_hoodie_record_key = '234fr'")
 
todelete.show()

+-------------------+--------------------+------------------+----------------------+-----------------+---+-----+---+------+----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name| id|state|age|salary|date|
+-------------------+--------------------+------------------+----------------------+-----------------+---+-----+---+------+----+
+-------------------+--------------------+------------------+----------------------+-----------------+---+-----+---+------+----+