In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

spark = (SparkSession.builder.appName("Hudi_Data_Processing_Framework")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.hive.convertMetastoreParquet", "false")
        .config("spark.jars.packages",
            "org.apache.hudi:hudi-spark-bundle_2.11:0.8.0,org.apache.hudi:hudi-spark-bundle_2.11:0.5.3,org.apache.spark:spark-avro_2.11:2.4.5")
        .getOrCreate())


In [2]:
input_df = spark.createDataFrame(
    [
        (100, "2015-01-01", "2015-01-01T13:51:39.340396Z", 10),
        (101, "2015-01-01", "2015-01-01T12:14:58.597216Z", 10),
        (102, "2015-01-01", "2015-01-01T13:51:40.417052Z", 10),
        (103, "2015-01-01", "2015-01-01T13:51:40.519832Z", 10),
        (104, "2015-01-02", "2015-01-01T12:15:00.512679Z", 10),
        (104, "2015-01-02", "2015-01-01T12:15:00.512679Z", 10),
        (104, "2015-01-02", "2015-01-02T12:15:00.512679Z", 20),
        (105, "2015-01-02", "2015-01-01T13:51:42.248818Z", 10),
    ],
    ("acctid", "date", "ts", "deposit"),
)

In [3]:
input_df.show()

+------+----------+--------------------+-------+
|acctid|      date|                  ts|deposit|
+------+----------+--------------------+-------+
|   100|2015-01-01|2015-01-01T13:51:...|     10|
|   101|2015-01-01|2015-01-01T12:14:...|     10|
|   102|2015-01-01|2015-01-01T13:51:...|     10|
|   103|2015-01-01|2015-01-01T13:51:...|     10|
|   104|2015-01-02|2015-01-01T12:15:...|     10|
|   104|2015-01-02|2015-01-01T12:15:...|     10|
|   104|2015-01-02|2015-01-02T12:15:...|     20|
|   105|2015-01-02|2015-01-01T13:51:...|     10|
+------+----------+--------------------+-------+



In [4]:
input_df.printSchema()

root
 |-- acctid: long (nullable = true)
 |-- date: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- deposit: long (nullable = true)



In [16]:
hudi_options = {
    # ---------------DATA SOURCE WRITE CONFIGS---------------#
    "hoodie.table.name": "hudi_acct",
    "hoodie.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.recordkey.field": "acctid",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.datasource.write.partitionpath.field": "date",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.upsert.shuffle.parallelism": 8,
    "hoodie.insert.shuffle.parallelism": 8,
    "hoodie.delete.shuffle.parallelism": 8,
#     "hoodie.consistency.check.enabled": True,
#     "hoodie.index.type": "BLOOM",
#     "hoodie.index.bloom.num_entries": 60000,
#     "hoodie.index.bloom.fpp": 0.000000001,
#     "hoodie.cleaner.commits.retained": 2,
}

hudi_dataset = 'F:\PyCharm_Projects\spark\hudi\op\\acct'

In [17]:
hudi_options

{'hoodie.table.name': 'hudi_acct',
 'hoodie.table.type': 'MERGE_ON_READ',
 'hoodie.datasource.write.operation': 'upsert',
 'hoodie.datasource.write.recordkey.field': 'acctid',
 'hoodie.datasource.write.precombine.field': 'ts',
 'hoodie.datasource.write.partitionpath.field': 'date',
 'hoodie.datasource.write.hive_style_partitioning': 'true',
 'hoodie.upsert.shuffle.parallelism': 8,
 'hoodie.insert.shuffle.parallelism': 8,
 'hoodie.delete.shuffle.parallelism': 8}

In [18]:
# INSERT
(
    input_df.write.format("org.apache.hudi")
    .options(**hudi_options)
    .mode("append")
    .save(hudi_dataset)
)

In [19]:
# READ
output_df = spark.read.format("org.apache.hudi").load(hudi_dataset+"/*/*")
output_df.show()

+-------------------+--------------------+------------------+----------------------+--------------------+------+----------+--------------------+-------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|acctid|      date|                  ts|deposit|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----------+--------------------+-------+
|     20210711181633| 20210711181633_0_16|               101|       date=2015-01-01|1cae8dd6-14ba-40a...|   101|2015-01-01|2015-01-01T12:14:...|     10|
|     20210711181633| 20210711181633_0_17|               100|       date=2015-01-01|1cae8dd6-14ba-40a...|   100|2015-01-01|2015-01-01T13:51:...|     10|
|     20210711181633| 20210711181633_0_18|               103|       date=2015-01-01|1cae8dd6-14ba-40a...|   103|2015-01-01|2015-01-01T13:51:...|     10|
|     20210711181633| 20210711181633_0_19|               102|       date=2015-01-0

In [20]:
update_df = spark.createDataFrame(
    [
        (100, "2015-01-01", "2015-01-01T13:51:39.340396Z", 20),
#         (201, "2015-01-01", "2015-01-01T12:14:58.597216Z", 10),
#         (202, "2015-01-01", "2015-01-01T13:51:40.417052Z", 10),
#         (203, "2015-01-01", "2015-01-01T13:51:40.519832Z", 10),
#         (204, "2015-01-02", "2015-01-01T12:15:00.512679Z", 10),
#         (204, "2015-01-03", "2015-01-01T12:15:00.512679Z", 10),
#         (204, "2015-01-04", "2015-01-02T12:15:00.512679Z", 20),
#         (205, "2015-01-02", "2015-01-01T13:51:42.248818Z", 10),
    ],
    ("acctid", "date", "ts", "deposit"),
)

In [10]:
update_df.show()

+------+----------+--------------------+-------+
|acctid|      date|                  ts|deposit|
+------+----------+--------------------+-------+
|   100|2015-01-01|2015-01-01T13:51:...|     20|
+------+----------+--------------------+-------+



In [21]:
# UPDATE
(
    update_df.write.format("org.apache.hudi")
    .options(**hudi_options)
    .mode("append")
    .save(hudi_dataset)
)

In [31]:
# READ
output_df = spark.read.format("org.apache.hudi").load(hudi_dataset+"/*/*")
output_df.show()

+-------------------+--------------------+------------------+----------------------+--------------------+------+----------+--------------------+-------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|acctid|      date|                  ts|deposit|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----------+--------------------+-------+
|     20210710182316|  20210710182316_1_1|               105|       date=2015-01-02|47b34019-f7d2-479...|   105|2015-01-02|2015-01-01T13:51:...|     10|
|     20210710182316|  20210710182316_1_3|               104|       date=2015-01-02|47b34019-f7d2-479...|   104|2015-01-02|2015-01-02T12:15:...|     20|
|     20210710183022|  20210710183022_0_7|               100|       date=2015-01-01|eda4dbef-2e4c-4fe...|   100|2015-01-01|2015-01-01T13:51:...|     20|
|     20210710182316|  20210710182316_0_5|               103|       date=2015-01-0

In [15]:
incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': '20210710182316', # Will return commits after this value
}
# READ
output_df = spark.read.format("org.apache.hudi").options(**incremental_read_options).load(hudi_dataset)
output_df.show()

+-------------------+--------------------+------------------+----------------------+--------------------+------+----------+--------------------+-------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|acctid|      date|                  ts|deposit|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----------+--------------------+-------+
|     20210710183022|  20210710183022_0_7|               100|       date=2015-01-01|eda4dbef-2e4c-4fe...|   100|2015-01-01|2015-01-01T13:51:...|     20|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----------+--------------------+-------+



In [38]:
hudi_delete_options = hudi_options.copy()
hudi_delete_options['hoodie.datasource.write.operation'] = 'delete'

hudi_delete_options

{'hoodie.table.name': 'hudi_acct',
 'hoodie.table.type': 'MERGE_ON_READ',
 'hoodie.datasource.write.operation': 'delete',
 'hoodie.datasource.write.recordkey.field': 'acctid',
 'hoodie.datasource.write.precombine.field': 'ts',
 'hoodie.datasource.write.partitionpath.field': 'date',
 'hoodie.datasource.write.hive_style_partitioning': 'true',
 'hoodie.upsert.shuffle.parallelism': 8,
 'hoodie.insert.shuffle.parallelism': 8,
 'hoodie.delete.shuffle.parallelism': 8}

In [48]:
# key and partition both are compulsory for delete to work
delete_df = spark.createDataFrame( [(102, "2015-01-01")], ("acctid", "date") ) 
delete_df.show()

+------+----------+
|acctid|      date|
+------+----------+
|   102|2015-01-01|
+------+----------+



In [49]:
# DELETE
(
    delete_df.write.format("org.apache.hudi")
    .options(**hudi_delete_options)
    .mode("append")
    .save(hudi_dataset)
)

In [50]:
# READ
output_df = spark.read.format("org.apache.hudi").load(hudi_dataset+"/*/*")
output_df.show()

+-------------------+--------------------+------------------+----------------------+--------------------+------+----------+--------------------+-------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|acctid|      date|                  ts|deposit|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----------+--------------------+-------+
|     20210710182316|  20210710182316_0_5|               103|       date=2015-01-01|eda4dbef-2e4c-4fe...|   103|2015-01-01|2015-01-01T13:51:...|     10|
|     20210710182316|  20210710182316_1_1|               105|       date=2015-01-02|47b34019-f7d2-479...|   105|2015-01-02|2015-01-01T13:51:...|     10|
|     20210710182316|  20210710182316_1_3|               104|       date=2015-01-02|47b34019-f7d2-479...|   104|2015-01-02|2015-01-02T12:15:...|     20|
+-------------------+--------------------+------------------+---------------------