In [5]:
%session_id_prefix native-hudi-dataframe-
%glue_version 3.0
%idle_timeout 60
%%configure 
{
  "--conf": "spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false",
  "--datalake-formats": "hudi"
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.3 
Setting session ID prefix to native-hudi-dataframe-
Setting Glue version to: 3.0
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 60 minutes.
The following configurations have been updated: {'--conf': 'spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false', '--datalake-formats': 'hudi'}


In [1]:
bucket_name = "mojap-raw-hist-sandbox-608"
bucket_prefix = "hmpps/oasys/EOR/BACKUP_SYNTHETIC/OASYS_QUESTION_SANDBOXY"
database_name = "hudi_df"
table_name = "oasys_synth_q"
table_prefix = f"{bucket_prefix}/{database_name}/{table_name}"
table_location = f"s3://{bucket_name}/{table_prefix}"

Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::684969100054:role/AdminAccessGlueNotebook
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 05a8d8ad-7d79-45ca-b7c3-17c45e4b69f0
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.3
--enable-glue-datacatalog true
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
--datalake-formats hudi
Waiting for session 05a8d8ad-7d79-45ca-b7c3-17c45e4b69f0 to get into ready status...
Session 05a8d8ad-7d79-45ca-b7c3-17c45e4b69f0 has been created.



## Clean up existing resources

In [2]:
import boto3

## Create a database with the name hudi_df to host hudi tables if not exists.
try:
    glue = boto3.client('glue')
    glue.create_database(DatabaseInput={'Name': database_name})
    print(f"New database {database_name} created")
except glue.exceptions.AlreadyExistsException:
    print(f"Database {database_name} already exist")

## Delete files in S3
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
bucket.objects.filter(Prefix=f"{table_prefix}/").delete()

## Drop table in Glue Data Catalog
try:
    glue = boto3.client('glue')
    glue.delete_table(DatabaseName=database_name, Name=table_name)
except glue.exceptions.EntityNotFoundException:
    print(f"Table {database_name}.{table_name} does not exist")


Database hudi_df already exist
{'ResponseMetadata': {'RequestId': '089b078d-1040-4639-81e1-e966b3f504c9', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Wed, 17 May 2023 07:04:51 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '2', 'connection': 'keep-alive', 'x-amzn-requestid': '089b078d-1040-4639-81e1-e966b3f504c9'}, 'RetryAttempts': 0}}


## Create Hudi table with sample data using catalog sync

In [3]:
from pyspark.sql import Row
import time
from pyspark.sql.functions import col, to_timestamp,lit
ut = time.time()


df_products = spark.read.parquet("s3://mojap-raw-hist-sandbox-608/hmpps/oasys/EOR/BACKUP_SYNTHETIC/OASYS_QUESTION_SANDBOXY/")
# Convert the date_string column to timestamp
df_products = df_products.withColumn("date_timestamp", to_timestamp(col("extraction_timestamp"), 'yyyy-MM-dd'))
df_products = df_products.withColumn("op", lit(None).cast("string"))







In [4]:
df_initial_full_load = df_products.sample(0.001)
df_to_insert = df_products.sample(0.001)





In [5]:
df_products.count()

16365759


In [6]:
df_initial_full_load.count()

16372


In [7]:
df_to_insert.count()

16367


In [8]:
df_initial_full_load.printSchema()

root
 |-- op: string (nullable = true)
 |-- extraction_timestamp: string (nullable = true)
 |-- scn: string (nullable = true)
 |-- oasys_question_pk: decimal(38,10) (nullable = true)
 |-- date_timestamp: timestamp (nullable = true)


In [9]:
table_location = f"s3://mojap-raw-hist-sandbox-608/hmpps/oasys/EOR/BACKUP_SYNTHETIC/HUDIEXAMPLE"
table_location

's3://mojap-raw-hist-sandbox-608/hmpps/oasys/EOR/BACKUP_SYNTHETIC/HUDIEXAMPLE'


In [10]:
hudi_options = {
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.recordkey.field': 'oasys_question_pk',
    'hoodie.datasource.write.partitionpath.field': 'scn',
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'date_timestamp',
    'hoodie.datasource.write.hive_style_partitioning': 'true',
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2,
    'path': table_location,
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.database': database_name,
    'hoodie.datasource.hive_sync.table': table_name,
    'hoodie.datasource.hive_sync.partition_fields': 'scn',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.mode': 'hms'
}





In [11]:
df_initial_full_load.write.format("hudi")  \
    .options(**hudi_options)  \
    .mode("overwrite")  \
    .save()




## note : for initial load the following two options need to be configured 
```.option('hoodie.datasource.write.operation', 'upsert')``` on hudi options

and 

``` .mode("overwrite")  ``` on  write statement

## Read from Hudi table

In [12]:
df_products_read = spark.read  \
    .format("hudi")  \
    .load(table_location)
df_products_read.show(1,vertical=True,truncate=False)


-RECORD 0-------------------------------------------------------------------------------------------
 _hoodie_commit_time    | 20230517070522865                                                         
 _hoodie_commit_seqno   | 20230517070522865_0_1                                                     
 _hoodie_record_key     | 8975953.9015971010                                                        
 _hoodie_partition_path | scn=SYNTHETICS                                                            
 _hoodie_file_name      | dd0de1d2-cb82-4bd5-8cc9-6b02fc50ca6a-0_0-36-193_20230517070522865.parquet 
 op                     | null                                                                      
 extraction_timestamp   | 2023-04-01                                                                
 oasys_question_pk      | 8975953.9015971010                                                        
 date_timestamp         | 2023-04-01 00:00:00                                              

In [13]:
df_products_read.count()

16372


## Upsert records into Hudi table

In [14]:
from pyspark.sql.functions import lit
ut = time.time()




# sample 1/5 of the already existing data from the full load inital data
df_to_update = df_initial_full_load.sample(0.2)
# Update the op column
df_to_update = df_to_update.withColumn("op", lit("U"))
df_to_update.show(1,vertical=True)

df_to_update.count()


-RECORD 0-----------------------------------
 op                   | U                   
 extraction_timestamp | 2023-04-01          
 scn                  | SYNTHETICS          
 oasys_question_pk    | 8976191.6095581650  
 date_timestamp       | 2023-04-01 00:00:00 
only showing top 1 row

3281


In [15]:
# Combine the updated data with new data (inserts)
df_product_updates = df_to_insert.union(df_to_update)
df_product_updates.count()

19648


In [16]:
df_product_updates.write.format("hudi") \
    .options(**hudi_options) \
    .mode("append") \
    .save()




In [17]:
df_product_updates_read = spark.read  \
    .format("hudi")  \
    .load(table_location)





In [18]:
df_product_updates_read.show(2,vertical=True)



-RECORD 0--------------------------------------
 _hoodie_commit_time    | 20230517070549924    
 _hoodie_commit_seqno   | 20230517070549924... 
 _hoodie_record_key     | 8975953.9015971010   
 _hoodie_partition_path | scn=SYNTHETICS       
 _hoodie_file_name      | dd0de1d2-cb82-4bd... 
 op                     | U                    
 extraction_timestamp   | 2023-04-01           
 oasys_question_pk      | 8975953.9015971010   
 date_timestamp         | 2023-04-01 00:00:00  
 scn                    | SYNTHETICS           
-RECORD 1--------------------------------------
 _hoodie_commit_time    | 20230517070522865    
 _hoodie_commit_seqno   | 20230517070522865... 
 _hoodie_record_key     | 8976150.5414756650   
 _hoodie_partition_path | scn=SYNTHETICS       
 _hoodie_file_name      | dd0de1d2-cb82-4bd... 
 op                     | null                 
 extraction_timestamp   | 2023-04-01           
 oasys_question_pk      | 8976150.5414756650   
 date_timestamp         | 2023-04-01 00:

In [19]:
df_product_updates_read.filter(col("op")=="U").show(2,vertical=True,truncate=False)

-RECORD 0-------------------------------------------------------------------------------------------
 _hoodie_commit_time    | 20230517070549924                                                         
 _hoodie_commit_seqno   | 20230517070549924_0_16373                                                 
 _hoodie_record_key     | 8975953.9015971010                                                        
 _hoodie_partition_path | scn=SYNTHETICS                                                            
 _hoodie_file_name      | dd0de1d2-cb82-4bd5-8cc9-6b02fc50ca6a-0_0-84-415_20230517070549924.parquet 
 op                     | U                                                                         
 extraction_timestamp   | 2023-04-01                                                                
 oasys_question_pk      | 8975953.9015971010                                                        
 date_timestamp         | 2023-04-01 00:00:00                                              

In [20]:
df_product_updates_read.count()

32721


## note : for upsert operation the following two options need to be configured 
```.option('hoodie.datasource.write.operation', 'upsert')``` on hudi options

and 

``` .mode("append")  ``` on  write statement

## Delete a Record
To hard delete a record, you can upsert an empty payload. In this case, the PAYLOAD_CLASS_OPT_KEY option specifies the EmptyHoodieRecordPayload class.

In [21]:
df_delete = df_product_updates_read.filter(col("op")=="U").sample(0.9)




In [22]:
df_delete.write \
    .format("org.apache.hudi") \
    .option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.EmptyHoodieRecordPayload") \
    .options(**hudi_options) \
    .mode("append") \
    .save() 




In [23]:
df_product_delete_read = spark.read  \
    .format("hudi")  \
    .load(table_location)





In [24]:
df_product_delete_read.show(2,vertical=True,truncate=False)

-RECORD 0-------------------------------------------------------------------------------------------
 _hoodie_commit_time    | 20230517070522865                                                         
 _hoodie_commit_seqno   | 20230517070522865_0_2                                                     
 _hoodie_record_key     | 8976150.5414756650                                                        
 _hoodie_partition_path | scn=SYNTHETICS                                                            
 _hoodie_file_name      | dd0de1d2-cb82-4bd5-8cc9-6b02fc50ca6a-0_0-36-193_20230517070522865.parquet 
 op                     | null                                                                      
 extraction_timestamp   | 2023-04-01                                                                
 oasys_question_pk      | 8976150.5414756650                                                        
 date_timestamp         | 2023-04-01 00:00:00                                              

In [25]:
df_product_delete_read.count()

29788


## to soft delete a record??

To soft delete a record?? 
TOCHANGE (you can upsert an empty payload. In this case, the PAYLOAD_CLASS_OPT_KEY option specifies the EmptyHoodieRecordPayload class.)

## SCD2 with HUDI

## Point in time query
Lets look at how to query data as of a specific time. The specific time can be represented by pointing endTime to a specific commit time and beginTime to "000" (denoting earliest possible commit time).

In [27]:
spark.read  \
    .format("hudi") \
    .load(table_location) \
    .createOrReplaceTempView("hudi_snapshot")




In [28]:
# store commits history as a list
commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_snapshot order by commitTime").limit(50).collect()))




In [29]:
commits

['20230516152948466', '20230516153038283']


In [30]:
beginTime = "000" # Represents all commits > this time.
endTime = commits[len(commits) - 2]

# query point in time data
point_in_time_read_options = {
    'hoodie.datasource.query.type': 'incremental',
    'hoodie.datasource.read.end.instanttime': endTime,
    'hoodie.datasource.read.begin.instanttime': beginTime
}

# get the initial table before upsert and delete
df_product_point_in_time_read = spark.read.format("hudi") \
    .options(**point_in_time_read_options)  \
    .load(table_location) \
    .show()



+-------------------+--------------------+------------------+----------------------+--------------------+----+--------------------+------------------+-------------------+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|  op|extraction_timestamp| oasys_question_pk|     date_timestamp|       scn|
+-------------------+--------------------+------------------+----------------------+--------------------+----+--------------------+------------------+-------------------+----------+
|  20230516152948466|20230516152948466...|8976181.1755816680|        scn=SYNTHETICS|156ddd66-6daa-419...|null|          2023-04-01|8976181.1755816680|2023-04-01 00:00:00|SYNTHETICS|
|  20230516152948466|20230516152948466...|8975987.4627111000|        scn=SYNTHETICS|156ddd66-6daa-419...|null|          2023-04-01|8975987.4627111000|2023-04-01 00:00:00|SYNTHETICS|
|  20230516152948466|20230516152948466...|8976331.8975047640|        scn=SYNTHETICS|156ddd

AttributeError: 'NoneType' object has no attribute 'count'


## Incremental Query
Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. We do not need to specify endTime, if we want all changes after the given commit.

In [32]:
beginTime = commits[len(commits) - 2] # commit time we are interested in

# incrementally query data
incremental_read_options = {
    'hoodie.datasource.query.type': 'incremental',
    'hoodie.datasource.read.begin.instanttime': beginTime
}

df_product_incremental_read = spark.read.format("hudi") \
    .options(**incremental_read_options)  \
    .load(table_location) \
    .show()

+-------------------+--------------------+--------------------+----------------------+--------------------+---+--------------------+--------------------+-------------------+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| op|extraction_timestamp|   oasys_question_pk|     date_timestamp|       scn|
+-------------------+--------------------+--------------------+----------------------+--------------------+---+--------------------+--------------------+-------------------+----------+
|  20230516153038283|20230516153038283...|  8976022.0697471090|        scn=SYNTHETICS|156ddd66-6daa-419...|  U|          2023-04-01|  8976022.0697471090|2023-04-01 00:00:00|SYNTHETICS|
|  20230516153038283|20230516153038283...|  8976215.9790765400|        scn=SYNTHETICS|156ddd66-6daa-419...|  U|          2023-04-01|  8976215.9790765400|2023-04-01 00:00:00|SYNTHETICS|
|  20230516153038283|20230516153038283...|  8976223.9416430870|        scn=

## Stop Session

In [None]:
%stop_session