In [None]:
%%configure -f
{
    "conf":  { 
             "spark.jars":"hdfs:///httpclient-4.5.9.jar,hdfs:///hudi-spark-bundle.jar,hdfs:///spark-avro.jar,",
             "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
             "spark.sql.hive.convertMetastoreParquet":"false"
           } 
}

In [88]:
spark.version

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'3.1.1-amzn-0'

In [89]:
# General Constants
HUDI_FORMAT = "org.apache.hudi"
TABLE_NAME = "hoodie.table.name"
RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
UPSERT_OPERATION_OPT_VAL = "upsert"
DELETE_OPERATION_OPT_VAL = "delete"
BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
S3_CONSISTENCY_CHECK = "hoodie.consistency.check.enabled"
HUDI_CLEANER_POLICY = "hoodie.cleaner.policy"
KEEP_LATEST_COMMITS = "KEEP_LATEST_COMMITS"
HUDI_COMMITS_RETAINED = "hoodie.cleaner.commits.retained"
PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class"
EMPTY_PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.common.model.EmptyHoodieRecordPayload"

# Hive Constants
HIVE_SYNC_ENABLED_OPT_KEY="hoodie.datasource.hive_sync.enable"
HIVE_PARTITION_FIELDS_OPT_KEY="hoodie.datasource.hive_sync.partition_fields"
HIVE_ASSUME_DATE_PARTITION_OPT_KEY="hoodie.datasource.hive_sync.assume_date_partitioning"
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY="hoodie.datasource.hive_sync.partition_extractor_class"
HIVE_TABLE_OPT_KEY="hoodie.datasource.hive_sync.table"

# Partition Constants
NONPARTITION_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.NonPartitionedExtractor"
MULIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.MultiPartKeysValueExtractor"
KEYGENERATOR_CLASS_OPT_KEY="hoodie.datasource.write.keygenerator.class"
NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.keygen.NonpartitionedKeyGenerator"
COMPLEX_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.ComplexKeyGenerator"
PARTITIONPATH_FIELD_OPT_KEY="hoodie.datasource.write.partitionpath.field"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [91]:
import random

## Generates Data - we are adding year and month columns this time.
def get_json_data(start, count, increment=0):
    data = [{"id": i, "sk": i+increment, "txt": chr(65 + (i % 26)), "year" : "2019", "month": random.randint(1,12) , "_hoodie_is_deleted": "false" } for i in range(start, start + count)]
    return data

# Creates the Dataframe
def create_json_df(spark, data):
    sc = spark.sparkContext
    return spark.read.json(sc.parallelize(data, 2))

df1 = create_json_df(spark, get_json_data(0, 20))
print(df1.count())
df1.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

20
+------------------+---+-----+---+---+----+
|_hoodie_is_deleted| id|month| sk|txt|year|
+------------------+---+-----+---+---+----+
|             false|  0|    1|  0|  A|2019|
|             false|  1|   10|  1|  B|2019|
|             false|  2|   10|  2|  C|2019|
|             false|  3|    6|  3|  D|2019|
|             false|  4|    3|  4|  E|2019|
+------------------+---+-----+---+---+----+
only showing top 5 rows

In [92]:
## CHANGE ME ##
config = {
    "table_name": "test_hudi_partitioned_table_upsert",
    "target": "s3://bucket/tmp/hudi/test_hudi_partitioned_table_upsert",
    "primary_key": "id",
    "sort_key": "sk",
    "commits_to_retain": "2",
    "partition_keys" : "year,month"
}

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [94]:
spark.sql("drop table if exists "+config['table_name']).show(100,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

++
||
++
++

In [95]:
from pyspark.sql.functions import concat, col, lit

hudiTablePartitionKey="partitionKey"
df1 = df1.withColumn(hudiTablePartitionKey,concat(lit("year="),col("year"),lit("/month="),col("month")))
df1.select(hudiTablePartitionKey).show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+
|      partitionKey|
+------------------+
| year=2019/month=1|
|year=2019/month=10|
|year=2019/month=10|
| year=2019/month=6|
| year=2019/month=3|
+------------------+
only showing top 5 rows

In [96]:
(df1.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
      .option(BULK_INSERT_PARALLELISM, 3)
      .option(HIVE_PARTITION_FIELDS_OPT_KEY, config["partition_keys"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,MULIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL)
      .option(PARTITIONPATH_FIELD_OPT_KEY,"partitionKey")
      .mode("Overwrite")
      .save(config['target']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [97]:
spark.sql("Select year, month, count(*) as num_records from "+config['table_name']+" group by year, month order by month").show(100,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----+-----------+
|year|month|num_records|
+----+-----+-----------+
|2019|1    |1          |
|2019|2    |1          |
|2019|3    |1          |
|2019|4    |1          |
|2019|5    |5          |
|2019|6    |3          |
|2019|7    |1          |
|2019|8    |2          |
|2019|10   |2          |
|2019|12   |3          |
+----+-----+-----------+

In [98]:
from pyspark.sql.functions import col,lit


incoming_df=spark.sql("Select * from "+config['table_name'] + " limit 1")
hoodie_metadata_columns = [k.name for k in incoming_df.schema.fields if k.name.startswith("_hoodie_")]
incoming_df=incoming_df.drop(*hoodie_metadata_columns)
incoming_df.printSchema()

incoming_df = incoming_df.withColumn(hudiTablePartitionKey,concat(lit("year="),col("year"),lit("/month="),col("month")))

incoming_df=incoming_df.withColumn("_hoodie_is_deleted",lit("true")).cache()
incoming_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: long (nullable = true)
 |-- sk: long (nullable = true)
 |-- txt: string (nullable = true)
 |-- partitionkey: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: long (nullable = true)

+---+---+---+-----------------+----+-----+------------------+
| id| sk|txt|     partitionKey|year|month|_hoodie_is_deleted|
+---+---+---+-----------------+----+-----+------------------+
| 11| 11|  L|year=2019/month=6|2019|    6|              true|
+---+---+---+-----------------+----+-----+------------------+

In [99]:
(incoming_df.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 3)
      .option(HIVE_PARTITION_FIELDS_OPT_KEY, config["partition_keys"])
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,MULIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL)
      .option(PARTITIONPATH_FIELD_OPT_KEY,"partitionKey")
      .mode("Append")
      .save(config['target']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [100]:
spark.sql("Select id, _hoodie_is_deleted from "+config['table_name']+" where id = 11 " ).show(100,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+------------------+
|id |_hoodie_is_deleted|
+---+------------------+
|11 |true              |
+---+------------------+

In [101]:
spark.sql("Select _hoodie_commit_time,_hoodie_is_deleted, count(1) as committed_records from "\
          +config['table_name'] \
          + " group by _hoodie_commit_time,_hoodie_is_deleted ").show(100,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+------------------+-----------------+
|_hoodie_commit_time|_hoodie_is_deleted|committed_records|
+-------------------+------------------+-----------------+
|20210721183627     |false             |19               |
|20210721183830     |true              |1                |
+-------------------+------------------+-----------------+