# Hudi example<br>




In [1]:
import org.apache.hadoop.fs.FileSystem
import org.apache.hudi.QuickstartUtils._
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import scala.collection.JavaConversions._

In [2]:
// olist_orders dataset
val datasetPath = "s3://kiyoung-data-us-east-1/dataset/olist_orders_dataset"
val df = spark.read.load(datasetPath)

In [3]:
// 데이터는 잘 있습니다.
df.show

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|995392413cee61cc1...|4bf24904ec428325a...|   delivered|     2017-09-04 13:24:05|2017-09-04 13:43:54|         2017-09-13 08:20:04|          2017-09-22 12:09:32|          2017-09-26 15:00:00|
|b39de9ed2bb8fd08a...|ed18b557140ff674f...|   delivered|     2018-03-27 00:15:17|2018-03-27 00:30:12|         2018-03-27 14:52:09|          2018-04-11 12:51:02|          2018-04-25 15:00:00|
|b1a88554eb1f7f686...|5cf799d0ac88e1d32...|  

In [4]:
// 스키마는 이렇게 생겼습니다.
df.printSchema

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [5]:
// hudi 데이터를 저장할 path 와 table 이름 입니다.
val basePath = "s3://kiyoung-data-store-us-east-1/hudi/olist_orders_dataset_cow"
val dbName = "hudi"
val tableName = "olist_orders_cow"

// 이 녀석은 예제에 많이 나오는 녀석인데, 별 것은 아니고, hoodie의 parallelism 셋팅 입니다.
getQuickstartWriteConfigs

{hoodie.upsert.shuffle.parallelism=2, hoodie.insert.shuffle.parallelism=2}

In [6]:
// insert
// 쓰잘데기 없이 오래 걸립니다.
// 이유는 파티션 때문인데, order_purchase_timestamp 키를 파티션을 나누며, 데이터가 timestamp 이다 보니 많은 수의 partition이 생성 됩니다.
// 만약 제가 실행하면 말려 주십시오.
df.write
  .format("hudi")
  .options(getQuickstartWriteConfigs)
  .option(RECORDKEY_FIELD_OPT_KEY, "order_id")
  .option(PRECOMBINE_FIELD_OPT_KEY, "order_purchase_timestamp")
  .option(PARTITIONPATH_FIELD_OPT_KEY, "order_purchase_timestamp")
  .option(TABLE_NAME, tableName)
  .mode(Overwrite)
  .save(basePath)

/*
RECORDKEY_FIELD_OPT_KEY
  - Record key field 로 PK라고 보시면 됩니다. 같은 value가 있을 경우 이 키를 기준으로 update/delete 등을 결정합니다.
PRECOMBINE_FIELD_OPT_KEY
  - 위의 RECORDKEY_FIELD_OPT_KEY 를 기준으로 두 개의 같은 row가 있을 경우 이 키를 기준으로 큰 값을 최신 데이터로 결정합니다.
  - 쉽게 같은 값이 있을 경우 이 키로 어떤 값을 쓸지 결정 합니다.
  - rdb 에서 보통 created_at, updated_at 으로 row가 생성 된 일자, update 된 일자를 기록하는데 updated_at 으로 가장 최신의 데이터를 결장하는 방식입니다.
PARTITIONPATH_FIELD_OPT_KEY
  - 파티션 키로 이 필드를 기준으로 파티션을 생성합니다.
  - hudi 로 업데이틑 한다고 하면, 전체 데이터에서 같은 key를 찾아 data를 업데이트 하긴 어렵습니다.
  - 그렇게 때문에 Hudi는 Partition을 내에서 데이터를 찾으며, 이를 강제하고 있습니다.
*/ 

java.lang.InterruptedException: Execution was interrupted by the user

In [7]:
// ** Insert **
// 항상 강조하지만 파티션은 소중합니다.
// 파티션용으로 쓸 컬럼을 order_purchase_timestamp 에서 일자 부분만 잘라 order_purchase_date 컬럼으로 만들어 주도록 하겠습니다.
// 소중한 시간을 위해 몇개의 일자만 필터링 하도록 하겠습니다.
// 어서 좋은 랩탑이 왔으면 좋겠습니다. -> 왔습니다. 짠.
df.withColumn("order_purchase_date", regexp_replace(date_format(to_date(col("order_purchase_timestamp")), "yyyy-MM-dd"), "-", "/"))
  .filter("order_purchase_date='2018/05/28' or order_purchase_date='2017/09/11' or order_purchase_date='2018/08/10' or order_purchase_date='2017/01/06'")
  .write
  .format("hudi")
  .options(getQuickstartWriteConfigs)
  .option(RECORDKEY_FIELD_OPT_KEY, "order_id")
  .option(PRECOMBINE_FIELD_OPT_KEY, "order_purchase_timestamp")
  .option(PARTITIONPATH_FIELD_OPT_KEY, "order_purchase_date")
  .option(HIVE_SYNC_ENABLED_OPT_KEY, "true")
  .option(HIVE_PARTITION_FIELDS_OPT_KEY, "order_purchase_date")
  .option(HIVE_DATABASE_OPT_KEY, dbName)
  .option(HIVE_TABLE_OPT_KEY, tableName)
  .option(TABLE_NAME, tableName)
  .mode(Overwrite)
  .save(basePath)

// 이렇게 생성 된 Hudi 데이터는 Hive, Presto, Spark 에서 읽으실 수 있습니다.
// https://hudi.apache.org/docs/querying_data.html

// Athena는 Hudi를 지원하지 않는다고 나와 있습니다.
// https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hudi-considerations.html

// 그러나!! 이 demo를 만들면서 테스트 해 봤더니 지원합니다. (us-east-1)
// 7/15일까지 compact version에 대한 read를 지원한다고 합니다.
// https://answers.amazon.com/questions/124938

// Athena와 Hive 에서 쿼리 해 보겠습니다.

In [8]:
// 저장 된 데이터를 로드해 볼까요?
val hudiDF = spark
  .read
  .format("hudi")
  .load(basePath + "/*/*/*")

In [9]:
// 참 쉽죠~
hudiDF.show(false)

+-------------------+--------------------+--------------------------------+----------------------+----------------------------------------------------------------------+--------------------------------+--------------------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key              |_hoodie_partition_path|_hoodie_file_name                                                     |order_id                        |customer_id                     |order_status|order_purchase_timestamp|order_approved_at  |order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_purchase_date|
+-------------------+--------------------+--------------------------------+----------------------+----------------------------------------------------------------------+-------------------------

In [10]:
// spark sql도 잘 됩니다.
spark.sql("""
    select *
    from hudi.olist_orders_cow
    where order_purchase_date = '2017-01-06'
""").show(false)

+-------------------+--------------------+--------------------------------+----------------------+----------------------------------------------------------------------+--------------------------------+--------------------------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key              |_hoodie_partition_path|_hoodie_file_name                                                     |order_id                        |customer_id                     |order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_purchase_date|
+-------------------+--------------------+--------------------------------+----------------------+----------------------------------------------------------------------+-----------------------------

In [11]:
-- 여기서도 할 수 있습니다.
select *
from hudi.olist_orders_cow
where order_purchase_date = '2017-01-06'

[_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 12 more fields]

In [12]:
// update

// 데이터 로드
// 한 2017년 1월 6일의 데이터를 로드 하도록 하겠습니다.
val olistOrdersDF = spark.sql("""
    select *
    from hudi.olist_orders_cow
    where order_purchase_date = '2017-01-06'
""")

olistOrdersDF.show(false)

+-------------------+--------------------+--------------------------------+----------------------+-----------------------------------------------------------------------+--------------------------------+--------------------------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key              |_hoodie_partition_path|_hoodie_file_name                                                      |order_id                        |customer_id                     |order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_purchase_date|
+-------------------+--------------------+--------------------------------+----------------------+-----------------------------------------------------------------------+--------------------------

In [13]:
// 그 중 한 건의 데이터에 대해 order_status를 ordered 로 바꾸도록 합니다. (그 전엔 delivered)
val updatedOlistOrdersDF = olistOrdersDF
  .where("order_id = '157ec3dc3f38cdbd2706bd216edfe8fb'")
  .withColumn("order_status", lit("ordered"))

// update 합니다.
// SaveMode가 Append 로 바뀐 것을 보실 수 있습니다.
updatedOlistOrdersDF
  .withColumn("order_purchase_date", regexp_replace(col("order_purchase_date"), "-", "/"))
  .write
  .format("hudi")
  .options(getQuickstartWriteConfigs)
  .option(RECORDKEY_FIELD_OPT_KEY, "order_id")
  .option(PRECOMBINE_FIELD_OPT_KEY, "order_purchase_timestamp")
  .option(PARTITIONPATH_FIELD_OPT_KEY, "order_purchase_date")
  .option(HIVE_SYNC_ENABLED_OPT_KEY, "true")
  .option(HIVE_PARTITION_FIELDS_OPT_KEY, "order_purchase_date")
  .option(HIVE_DATABASE_OPT_KEY, dbName)
  .option(HIVE_TABLE_OPT_KEY, tableName)
  .option(TABLE_NAME, tableName)
  .mode(Append)
  .save(basePath)

In [14]:
// incremental query

// commit time 들을 보실 수 있습니다.
// 처음 insert, 두번째 update 이렇게 두 번 write 하였기 때문에 2개가 나오는 것을 보실 수 있습니다.
val commitsDF = spark.sql("""
    select distinct(_hoodie_commit_time) as commitTime
    from hudi.olist_orders_cow
    order by commitTime
""")

commitsDF.show(false)

+--------------+
|commitTime    |
+--------------+
|20200701042831|
|20200701042913|
+--------------+



In [15]:
val commits = commitsDF.map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // 마지막 commit time을 가져 옵니다.

In [16]:
// 위에서 가져온 마지막 commit time을 가지고 데이터를 로드 하면 해당 commit에 대한 data만 보실 수 있습니다.
spark.read.format("hudi")
  .option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL)
  .option(BEGIN_INSTANTTIME_OPT_KEY, beginTime)
  .load(basePath)
  .show(false)

+-------------------+--------------------+--------------------------------+----------------------+-----------------------------------------------------------------------+--------------------------------+--------------------------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key              |_hoodie_partition_path|_hoodie_file_name                                                      |order_id                        |customer_id                     |order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_purchase_date|
+-------------------+--------------------+--------------------------------+----------------------+-----------------------------------------------------------------------+--------------------------

In [17]:
// delete

// 삭제 할 데이터.
// 위에서 사용한 row 한개를 지우기 위해 데이터를 로드 합니다.
val tobeDeletedDF = spark.sql("""
    select *
    from hudi.olist_orders_cow
    where order_purchase_date = '2017-01-06'
    and order_id = '157ec3dc3f38cdbd2706bd216edfe8fb'
""")

tobeDeletedDF.show(false)

+-------------------+--------------------+--------------------------------+----------------------+-----------------------------------------------------------------------+--------------------------------+--------------------------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key              |_hoodie_partition_path|_hoodie_file_name                                                      |order_id                        |customer_id                     |order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_purchase_date|
+-------------------+--------------------+--------------------------------+----------------------+-----------------------------------------------------------------------+--------------------------

In [18]:
// 아까 업데이트에서 option(OPERATION_OPT_KEY,"delete") 만 추가 되었습니다.

tobeDeletedDF
  .withColumn("order_purchase_date", regexp_replace(col("order_purchase_date"), "-", "/"))
  .write
  .format("hudi")
  .options(getQuickstartWriteConfigs)
  .option(OPERATION_OPT_KEY,"delete")
  .option(RECORDKEY_FIELD_OPT_KEY, "order_id")
  .option(PRECOMBINE_FIELD_OPT_KEY, "order_purchase_timestamp")
  .option(PARTITIONPATH_FIELD_OPT_KEY, "order_purchase_date")
  .option(HIVE_SYNC_ENABLED_OPT_KEY, "true")
  .option(HIVE_PARTITION_FIELDS_OPT_KEY, "order_purchase_date")
  .option(HIVE_DATABASE_OPT_KEY, dbName)
  .option(HIVE_TABLE_OPT_KEY, tableName)
  .option(TABLE_NAME, tableName)
  .mode(Append)
  .save(basePath)

In [19]:
// 해당 일자에서 데이터가 없어진 것을 보실 수 있습니다.
spark.sql("""
    select *
    from hudi.olist_orders_cow
    where order_purchase_date = '2017-01-06'
""").show(false)

+-------------------+--------------------+--------------------------------+----------------------+-----------------------------------------------------------------------+--------------------------------+--------------------------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+-------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key              |_hoodie_partition_path|_hoodie_file_name                                                      |order_id                        |customer_id                     |order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|order_purchase_date|
+-------------------+--------------------+--------------------------------+----------------------+-----------------------------------------------------------------------+--------------------------