## 参考資料

- PySpark API Document: https://spark.apache.org/docs/latest/api/python/index.html
- SparkのpackagesをSparkSession作成時に指定する方法: https://qiita.com/junkor-1011/items/b12ec62f2615d068c1a5
- Hudiクイックスタート: https://hudi.apache.org/docs/0.7.0-quick-start-guide.html#pyspark-example
- Hiveオプションについてのもの: https://dev.classmethod.jp/articles/apache-hudi-on-aws-sync-glue-data-catalog/

In [1]:
# 必要なSparkモジュールの読み込み
from pyspark.sql import SparkSession

In [2]:
# SparkSessionの作成
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1") \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .getOrCreate()
sc = spark.sparkContext

In [3]:
# テーブル設定
tableName = "hudi_trips_cow"
basePath = "file:///tmp/hudi_trips_cow"
# サンプルデータ生成用ライブラリの読み込み
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()

In [4]:
# データインサート
# インサート用のデータを作成
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

# Hudiのオプション
hudi_options = {
    'hoodie.table.name': tableName,
    'hoodie.datasource.write.recordkey.field': 'uuid',
    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
    'hoodie.datasource.write.table.name': tableName,
    'hoodie.datasource.write.operation': 'upsert', # upsertを指定しているため、後述の更新処理の際もこのオプションをそのまま利用できる
    'hoodie.datasource.write.precombine.field': 'ts',
    'hoodie.upsert.shuffle.parallelism': 2, 
    'hoodie.insert.shuffle.parallelism': 2,
    # 以下Hiveオプション(なお、現在Hiveを起動できないので使用できない)
    # 'hoodie.datasource.hive_sync.enable': 'true',
    # 'hoodie.datasource.hive_sync.database': 'default',
    # 'hoodie.datasource.hive_sync.table': tableName,
    # 'hoodie.datasource.hive_sync.partition_fields': 'contient,country,city',
    # 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    # 'hoodie.datasource.hive_sync.use_jdbc': 'false',
}

# データの書き込み
# 「mode」にoverwriteが指定されている場合、新規にテーブルを作成する。(すでにテーブル等が存在する場合は、Dropされる)
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("overwrite"). \
  save(basePath)

In [5]:
# テーブル確認
%ls /tmp/hudi_trips_cow/

[0m[01;34mamericas[0m/  [01;34masia[0m/


In [6]:
# データクエリ
# DataFrameを読み込む
tripsSnapshotDF = spark. \
  read. \
  format("hudi"). \
  load(basePath + "/*/*/*/*")
# load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery

# DataFrameにテーブル名をつける(SQLのViewのようなもの)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

# クエリの実行
spark.sql("SELECT fare, begin_lon, begin_lat, ts FROM hudi_trips_snapshot WHERE fare > 20.0").show()
spark.sql("SELECT _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare FROM hudi_trips_snapshot").show()

+------------------+-------------------+-------------------+-------------+
|              fare|          begin_lon|          begin_lat|           ts|
+------------------+-------------------+-------------------+-------------+
| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|1613122976362|
| 27.79478688582596| 0.6273212202489661|0.11488393157088261|1613143614746|
| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|1613290212227|
| 93.56018115236618|0.14285051259466197|0.21624150367601136|1612919760094|
|  43.4923811219014| 0.8779402295427752| 0.6100070562136587|1613406756346|
|34.158284716382845|0.46157858450465483| 0.4726905879569653|1613220467917|
| 66.62084366450246|0.03844104444445928| 0.0750588760043035|1613098090130|
| 41.06290929046368| 0.8192868687714224|  0.651058505660742|1613250211029|
+------------------+-------------------+-------------------+-------------+

+-------------------+--------------------+----------------------+---------+----------+-------------

In [7]:
# データの更新
# 更新用のデータ作成
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
# データ更新
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("append"). \
  save(basePath)

In [8]:
# 更新確認
from pyspark.sql.functions import col
spark.sql("SELECT uuid, fare, rider, driver, ts FROM hudi_trips_snapshot ORDER BY uuid").show()
updatedDF = spark. \
  read. \
  format("hudi"). \
  load(basePath + "/*/*/*/*")
updatedDF.orderBy(col("uuid")).select("uuid", "fare", "rider", "driver", "ts").show()

+--------------------+------------------+---------+----------+-------------+
|                uuid|              fare|    rider|    driver|           ts|
+--------------------+------------------+---------+----------+-------------+
|37333502-cc20-4a7...|34.158284716382845|rider-213|driver-213|1613220467917|
|4852c8c8-a53c-4f8...| 41.06290929046368|rider-213|driver-213|1613250211029|
|9520f139-7b96-4b4...| 33.92216483948643|rider-213|driver-213|1613290212227|
|bf7e79f3-9e47-4b9...|19.179139106643607|rider-213|driver-213|1613230728881|
|c7fefb7c-9b2a-453...| 64.27696295884016|rider-213|driver-213|1613122976362|
|cb88a571-850b-4a3...|17.851135255091155|rider-213|driver-213|1612922184633|
|cefc00b9-bf58-4bd...| 93.56018115236618|rider-213|driver-213|1612919760094|
|d15af8f5-df63-4db...| 66.62084366450246|rider-213|driver-213|1613098090130|
|ecd16b57-b100-462...|  43.4923811219014|rider-213|driver-213|1613406756346|
|f5d8dbb0-e773-4e3...| 27.79478688582596|rider-213|driver-213|1613143614746|

In [9]:
# 条件分岐
from pyspark.sql.functions import col, when, lit
# クエリの作成
when_query = when(col("rider") == "rider-213", "true").otherwise("false").alias("is 213")
# SELECTで使用
print("SELECT: ")
updatedDF.select("uuid", "rider", when_query).show()
# カラム追加
print("ADD COLUMN: ")
updatedDF.withColumn("boolean", when_query).select("uuid", "rider", "boolean").show()
# DataFrameはイミュータブルなので、変更処理は新しいDFを生成する
# そのため、元のDFの定義は変わらない
print(updatedDF.columns)

# カラム更新
print("UPDATE: BEFORE:")
updatedDF.select("uuid", "rider", "driver").show()
print("UPDATE: AFTER:")
updatedDF.withColumn("driver", when_query).select("uuid", "rider", "driver").show()

SELECT: 
+--------------------+---------+------+
|                uuid|    rider|is 213|
+--------------------+---------+------+
|c7fefb7c-9b2a-453...|rider-284| false|
|bf7e79f3-9e47-4b9...|rider-213|  true|
|f5d8dbb0-e773-4e3...|rider-284| false|
|9520f139-7b96-4b4...|rider-284| false|
|cefc00b9-bf58-4bd...|rider-213|  true|
|4852c8c8-a53c-4f8...|rider-284| false|
|cb88a571-850b-4a3...|rider-284| false|
|ecd16b57-b100-462...|rider-213|  true|
|37333502-cc20-4a7...|rider-284| false|
|d15af8f5-df63-4db...|rider-284| false|
+--------------------+---------+------+

ADD COLUMN: 
+--------------------+---------+-------+
|                uuid|    rider|boolean|
+--------------------+---------+-------+
|c7fefb7c-9b2a-453...|rider-284|  false|
|bf7e79f3-9e47-4b9...|rider-213|   true|
|f5d8dbb0-e773-4e3...|rider-284|  false|
|9520f139-7b96-4b4...|rider-284|  false|
|cefc00b9-bf58-4bd...|rider-213|   true|
|4852c8c8-a53c-4f8...|rider-284|  false|
|cb88a571-850b-4a3...|rider-284|  false|
|ecd16b