In [1]:
import os
import datetime

import pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")

In [None]:
spark = builder.getOrCreate()
# It might take briefly to start the Spark process.
# Wait for some output below to let the Spark fully started.

In [None]:
spark

In [5]:
tblname = "tripstbl"
tblpath = "file://" + os.getcwd() + "/out/tripstbl"
sc = spark.sparkContext

In [6]:
# generate Trips data
data_gen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
data_inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(data_gen.generateInserts(10))

In [None]:
data_inserts

In [8]:
type(data_inserts)

py4j.java_collections.JavaList

In [None]:
# read back into Python PySpark dataframe
df = spark.read.json(sc.parallelize(data_inserts, 2))
# wait for moment to let Spark process finish
# should observe loading progress bar below as cell output

In [10]:
type(df)

pyspark.sql.dataframe.DataFrame

In [11]:
df

DataFrame[begin_lat: double, begin_lon: double, driver: string, end_lat: double, end_lon: double, fare: double, partitionpath: string, rider: string, ts: bigint, uuid: string]

In [12]:
df.printSchema()

root
 |-- begin_lat: double (nullable = true)
 |-- begin_lon: double (nullable = true)
 |-- driver: string (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lon: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- partitionpath: string (nullable = true)
 |-- rider: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- uuid: string (nullable = true)



In [13]:
df.show()

+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+-------------+--------------------+
|          begin_lat|          begin_lon|    driver|            end_lat|            end_lon|              fare|       partitionpath|    rider|           ts|                uuid|
+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+-------------+--------------------+
| 0.4726905879569653|0.46157858450465483|driver-213|  0.754803407008858| 0.9671159942018241|34.158284716382845|americas/brazil/s...|rider-213|1722623758034|a9cbd6bd-af10-47a...|
| 0.6100070562136587| 0.8779402295427752|driver-213| 0.3407870505929602| 0.5030798142293655|  43.4923811219014|americas/brazil/s...|rider-213|1722567803259|1d18935b-2d62-4b5...|
| 0.5731835407930634| 0.4923479652912024|driver-213|0.08988581780930216|0.42520899698713666| 64.27696295884016

In [14]:
# now let write it out this in-memory dataframe into Hudi table
# in the following, we will do insert and update a Hudi table of default table type: Copy on Write
# https://hudi.apache.org/docs/table_types/#copy-on-write-table

# observe that how we use raw data fields "uuid", "ts", "partitionpath"
# map to table options for those Hudi table mandatory keys
# https://hudi.apache.org/docs/writing_data/

hudi_options = {
    'hoodie.table.name': tblname,
    'hoodie.datasource.write.recordkey.field': 'uuid',
    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
    'hoodie.datasource.write.table.name': tblname,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'ts',
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2
}

In [None]:
df.write.format("hudi").options(**hudi_options).mode("overwrite").save(tblpath)
# wait for moment to let Spark process finish
# should observe loading progress bar below as cell output

In [16]:
# should see 'tripstbl' folder created within 'out' directory
!ls -al out/

total 0
drwxr-xr-x@  5 sklin  staff  160  3 Aug 15:50 [1m[36m.[m[m
drwxr-xr-x@ 15 sklin  staff  480  3 Aug 15:49 [1m[36m..[m[m
-rw-r--r--@  1 sklin  staff    0 16 Sep  2022 .placeholder
drwxr-xr-x@  7 sklin  staff  224  3 Aug 15:37 [1m[36mmytbl[m[m
drwxr-xr-x@  5 sklin  staff  160  3 Aug 15:50 [1m[36mtripstbl[m[m


In [17]:
# now let read back Hudi table
trips_df = spark.read.format("hudi").load(tblpath)

In [18]:
trips_df

DataFrame[_hoodie_commit_time: string, _hoodie_commit_seqno: string, _hoodie_record_key: string, _hoodie_partition_path: string, _hoodie_file_name: string, begin_lat: double, begin_lon: double, driver: string, end_lat: double, end_lon: double, fare: double, rider: string, ts: bigint, uuid: string, partitionpath: string]

In [19]:
trips_df.printSchema()

root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- begin_lat: double (nullable = true)
 |-- begin_lon: double (nullable = true)
 |-- driver: string (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lon: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- rider: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- uuid: string (nullable = true)
 |-- partitionpath: string (nullable = true)



In [None]:
trips_df.show(2)

In [21]:
trips_df.dtypes

[('_hoodie_commit_time', 'string'),
 ('_hoodie_commit_seqno', 'string'),
 ('_hoodie_record_key', 'string'),
 ('_hoodie_partition_path', 'string'),
 ('_hoodie_file_name', 'string'),
 ('begin_lat', 'double'),
 ('begin_lon', 'double'),
 ('driver', 'string'),
 ('end_lat', 'double'),
 ('end_lon', 'double'),
 ('fare', 'double'),
 ('rider', 'string'),
 ('ts', 'bigint'),
 ('uuid', 'string'),
 ('partitionpath', 'string')]

In [22]:
# create table snapshot view to query with spark.sql(...)
trips_df.createOrReplaceTempView("trips_snapshot")

In [23]:
spark.sql("SELECT COUNT(*) FROM trips_snapshot").show()

+--------+
|count(1)|
+--------+
|      10|
+--------+



In [24]:
spark.sql("SELECT fare, begin_lon, begin_lat, ts FROM trips_snapshot WHERE fare > 20.0").show()

+------------------+-------------------+-------------------+-------------+
|              fare|          begin_lon|          begin_lat|           ts|
+------------------+-------------------+-------------------+-------------+
| 27.79478688582596| 0.6273212202489661|0.11488393157088261|1722310812455|
| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|1722461572329|
| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|1722630280616|
| 93.56018115236618|0.14285051259466197|0.21624150367601136|1722287748128|
| 66.62084366450246|0.03844104444445928| 0.0750588760043035|1722105576252|
|34.158284716382845|0.46157858450465483| 0.4726905879569653|1722623758034|
|  43.4923811219014| 0.8779402295427752| 0.6100070562136587|1722567803259|
| 41.06290929046368| 0.8192868687714224|  0.651058505660742|1722450739859|
+------------------+-------------------+-------------------+-------------+



In [25]:
spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM trips_snapshot").show()

+-------------------+--------------------+----------------------+---------+----------+------------------+
|_hoodie_commit_time|  _hoodie_record_key|_hoodie_partition_path|    rider|    driver|              fare|
+-------------------+--------------------+----------------------+---------+----------+------------------+
|  20240803155056774|95fe4c32-0bff-4bf...|  americas/united_s...|rider-213|driver-213|19.179139106643607|
|  20240803155056774|d191f101-57b3-4ab...|  americas/united_s...|rider-213|driver-213| 27.79478688582596|
|  20240803155056774|5fbea174-901d-4ff...|  americas/united_s...|rider-213|driver-213| 64.27696295884016|
|  20240803155056774|d99e45e7-5351-472...|  americas/united_s...|rider-213|driver-213| 33.92216483948643|
|  20240803155056774|2ae71fa1-9e60-4d3...|  americas/united_s...|rider-213|driver-213| 93.56018115236618|
|  20240803155056774|cfe2e88b-92af-4b6...|  americas/brazil/s...|rider-213|driver-213| 66.62084366450246|
|  20240803155056774|a9cbd6bd-af10-47a...|  am

In [26]:
cut_off_ts_row = spark.sql("SELECT ts FROM trips_snapshot ORDER BY ts DESC LIMIT 1").collect()[0]
# type(cut_off_ts_row)
cut_off_ts = cut_off_ts_row['ts']
cut_off_ts

1722630280616

In [27]:
cut_off_dt = datetime.datetime.fromtimestamp(cut_off_ts / 1000.0)
cut_off_dt = cut_off_dt + datetime.timedelta(days=2)
cut_off_dt_str = cut_off_dt.strftime('%Y-%m-%d')
cut_off_dt_str

'2024-08-05'

In [28]:
# time travel query
# trips_tt_df = spark.read.format("hudi").option("as.of.instant", "20210728141108").load(tblpath)

# alternate time formats
# trips_tt_df = spark.read.format("hudi").option("as.of.instant", "2021-07-28 14:11:08").load(tblpath)

# it is equal to "as.of.instant = 2021-07-28 00:00:00"
# trips_tt_df = spark.read.format("hudi").option("as.of.instant", "2021-07-28").load(tblpath)

trips_tt_df = spark.read.format("hudi").option("as.of.instant", cut_off_dt_str).load(tblpath)

In [29]:
trips_tt_df.count()

10

In [30]:
# Update data
snapshot_query = "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM trips_snapshot"

In [31]:
snapshot_before_update = spark.sql(snapshot_query)
snapshot_before_update.count()

10

In [32]:
data_updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(data_gen.generateUpdates(10))

In [33]:
data_updates

['{"ts": 1722071353169, "uuid": "5fbea174-901d-4fff-94d9-90b71594a86c", "rider": "rider-284", "driver": "driver-284", "begin_lat": 0.7340133901254792, "begin_lon": 0.5142184937933181, "end_lat": 0.7814655558162802, "end_lon": 0.6592596683641996, "fare": 49.527694252432056, "partitionpath": "americas/united_states/san_francisco"}, "partitionpath": "americas/united_states/san_francisco"}', '{"ts": 1722420860353, "uuid": "a9cbd6bd-af10-47a1-a62a-0cb62bfcc6c3", "rider": "rider-284", "driver": "driver-284", "begin_lat": 0.1593867607188556, "begin_lon": 0.010872312870502165, "end_lat": 0.9808530350038475, "end_lon": 0.7963756520507014, "fare": 29.47661370147079, "partitionpath": "americas/brazil/sao_paulo"}, "partitionpath": "americas/brazil/sao_paulo"}', '{"ts": 1722557779842, "uuid": "a9cbd6bd-af10-47a1-a62a-0cb62bfcc6c3", "rider": "rider-284", "driver": "driver-284", "begin_lat": 0.7180196467760873, "begin_lon": 0.13755354862499358, "end_lat": 0.3037264771699937, "end_lon": 0.253904715505

In [34]:
df = spark.read.json(sc.parallelize(data_updates, 2))

In [35]:
df

DataFrame[begin_lat: double, begin_lon: double, driver: string, end_lat: double, end_lon: double, fare: double, partitionpath: string, rider: string, ts: bigint, uuid: string]

In [36]:
df.count()

10

In [None]:
df.write.format("hudi").options(**hudi_options).mode("append").save(tblpath)
# wait for moment to let Spark process finish
# should observe loading progress bar below as cell output

In [38]:
# validations
assert spark.sql(snapshot_query).count() == 10
assert df.count() == 10

In [39]:
# intersect might be non-deterministic as it depends on how/when 'data_updates' generated with what randomness
# observe how many data points get intersect after updated
spark.sql(snapshot_query).intersect(df).count()

0

In [40]:
# validations
assert spark.sql(snapshot_query).intersect(df).count() == 0

In [41]:
# re-load 'trips_df' from current table state
trips_df = spark.read.format("hudi").load(tblpath)

In [42]:
trips_df.count()

10

In [43]:
trips_df.printSchema()

root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- begin_lat: double (nullable = true)
 |-- begin_lon: double (nullable = true)
 |-- driver: string (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lon: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- rider: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- uuid: string (nullable = true)
 |-- partitionpath: string (nullable = true)



In [44]:
trips_df.createOrReplaceTempView("trips_updated")

In [45]:
spark.sql("SELECT uuid, ts, partitionpath, rider, driver, fare, begin_lon, begin_lat, end_lon, end_lat FROM trips_snapshot ORDER BY uuid DESC").show()

+--------------------+-------------+--------------------+---------+----------+------------------+-------------------+-------------------+-------------------+-------------------+
|                uuid|           ts|       partitionpath|    rider|    driver|              fare|          begin_lon|          begin_lat|            end_lon|            end_lat|
+--------------------+-------------+--------------------+---------+----------+------------------+-------------------+-------------------+-------------------+-------------------+
|d99e45e7-5351-472...|1722630280616|americas/united_s...|rider-213|driver-213| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.25252652214479043|0.38186367037201974|
|d191f101-57b3-4ab...|1722310812455|americas/united_s...|rider-213|driver-213| 27.79478688582596| 0.6273212202489661|0.11488393157088261| 0.3954939864908973| 0.7454678537511295|
|cfe2e88b-92af-4b6...|1722105576252|americas/brazil/s...|rider-213|driver-213| 66.62084366450246|0.03844104444

In [46]:
spark.sql("SELECT uuid, ts, partitionpath, rider, driver, fare, begin_lon, begin_lat, end_lon, end_lat FROM trips_updated ORDER BY uuid DESC").show()

+--------------------+-------------+--------------------+---------+----------+------------------+-------------------+--------------------+-------------------+------------------+
|                uuid|           ts|       partitionpath|    rider|    driver|              fare|          begin_lon|           begin_lat|            end_lon|           end_lat|
+--------------------+-------------+--------------------+---------+----------+------------------+-------------------+--------------------+-------------------+------------------+
|d99e45e7-5351-472...|1722519536930|americas/united_s...|rider-284|driver-284| 2.375516772415698|0.42849372303000655|0.014159831486388885| 0.9451993293955782|0.9968531966280192|
|d191f101-57b3-4ab...|1722518496018|americas/united_s...|rider-284|driver-284|  98.3428192817987| 0.3349917833248327|  0.4777395067707303| 0.8144901865212508|0.9735699951963335|
|cfe2e88b-92af-4b6...|1722293128623|americas/brazil/s...|rider-284|driver-284| 63.72504913279929|  0.888493603