# Step 1 : Define Imports 

In [29]:
%%configure -f
{
"conf": {
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
    "spark.sql.hive.convertMetastoreParquet": "false",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
    "spark.sql.legacy.pathOptionBehavior.enabled": "true",
    "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,,pyspark,idle,,,,✔


In [None]:
%%sh
pip install Faker

In [30]:
spark.sql("set hoodie.schema.on.read.enable=true")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[key: string, value: string]

In [31]:
import json
import random

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Step 2: Define Helper Methods for the lab

In [32]:
def upsert_hudi_table(glue_database, table_name, record_id, precomb_key, table_type, spark_df, partition_fields,
                      enable_partition, enable_cleaner, enable_hive_sync, enable_clustering,
                      enable_meta_data_indexing,
                      use_sql_transformer, sql_transformer_query,
                      target_path, index_type, method='upsert', clustering_column='default'):
    """
    Upserts a dataframe into a Hudi table.

    Args:
        glue_database (str): The name of the glue database.
        table_name (str): The name of the Hudi table.
        record_id (str): The name of the field in the dataframe that will be used as the record key.
        precomb_key (str): The name of the field in the dataframe that will be used for pre-combine.
        table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, MERGE_ON_READ).
        spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
        partition_fields this is used to parrtition data
        enable_partition (bool): Whether or not to enable partitioning.
        enable_cleaner (bool): Whether or not to enable data cleaning.
        enable_hive_sync (bool): Whether or not to enable syncing with Hive.
        use_sql_transformer (bool): Whether or not to use SQL to transform the dataframe before upserting.
        sql_transformer_query (str): The SQL query to use for data transformation.
        target_path (str): The path to the target Hudi table.
        method (str): The Hudi write method to use (default is 'upsert').
        index_type : BLOOM or GLOBAL_BLOOM
    Returns:
        None
    """
    # These are the basic settings for the Hoodie table
    hudi_final_settings = {
        "hoodie.table.name": table_name,
        "hoodie.datasource.write.table.type": table_type,
        "hoodie.datasource.write.operation": method,
        "hoodie.datasource.write.recordkey.field": record_id,
        "hoodie.datasource.write.precombine.field": precomb_key,
        "hoodie.schema.on.read.enable":"true"
    }

    # These settings enable syncing with Hive
    hudi_hive_sync_settings = {
        "hoodie.parquet.compression.codec": "gzip",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": glue_database,
        "hoodie.datasource.hive_sync.table": table_name,
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms",
    }

    # These settings enable automatic cleaning of old data
    hudi_cleaner_options = {
        "hoodie.clean.automatic": "true",
        "hoodie.clean.async": "true",
        "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
        "hoodie.cleaner.fileversions.retained": "3",
        "hoodie-conf hoodie.cleaner.parallelism": '200',
        'hoodie.cleaner.commits.retained': 5
    }

    # These settings enable partitioning of the data
    partition_settings = {
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        "hoodie.datasource.hive_sync.partition_fields": partition_fields,
        "hoodie.datasource.write.hive_style_partitioning": "true",
    }

    hudi_clustering = {
        "hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
        "hoodie.clustering.inline": "true",
        "hoodie.clustering.plan.strategy.sort.columns": clustering_column,
        "hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824",
        "hoodie.clustering.plan.strategy.small.file.limit": "629145600"
    }

    # Define a dictionary with the index settings for Hudi
    hudi_index_settings = {
        "hoodie.index.type": index_type,  # Specify the index type for Hudi
    }

    # Define a dictionary with the Fiel Size
    hudi_file_size = {
        "hoodie.parquet.max.file.size": 512 * 1024 * 1024,  # 512MB
        "hoodie.parquet.small.file.limit": 104857600,  # 100MB
    }

    hudi_meta_data_indexing = {
        "hoodie.metadata.enable": "true",
        "hoodie.metadata.index.async": "true",
        "hoodie.metadata.index.column.stats.enable": "true",
        "hoodie.metadata.index.check.timeout.seconds": "60",
        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
    }

    if enable_meta_data_indexing == True or enable_meta_data_indexing == "True" or enable_meta_data_indexing == "true":
        for key, value in hudi_meta_data_indexing.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    if enable_clustering == True or enable_clustering == "True" or enable_clustering == "true":
        for key, value in hudi_clustering.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    # Add the Hudi index settings to the final settings dictionary
    for key, value in hudi_index_settings.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    for key, value in hudi_file_size.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    # If partitioning is enabled, add the partition settings to the final settings
    if enable_partition == "True" or enable_partition == "true" or enable_partition == True:
        for key, value in partition_settings.items(): hudi_final_settings[key] = value

    # If data cleaning is enabled, add the cleaner options to the final settings
    if enable_cleaner == "True" or enable_cleaner == "true" or enable_cleaner == True:
        for key, value in hudi_cleaner_options.items(): hudi_final_settings[key] = value

    # If Hive syncing is enabled, add the Hive sync settings to the final settings
    if enable_hive_sync == "True" or enable_hive_sync == "true" or enable_hive_sync == True:
        for key, value in hudi_hive_sync_settings.items(): hudi_final_settings[key] = value

    # If there is data to write, apply any SQL transformations and write to the target path
    if spark_df.count() > 0:
        if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
            spark_df.createOrReplaceTempView("temp")
            spark_df = spark.sql(sql_transformer_query)

        spark_df.write.format("hudi"). \
            options(**hudi_final_settings). \
            mode("append"). \
            save(target_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
import random
import uuid  # Import the uuid library

def generate_dynamic_json_data(sample_size=10, max_columns=5):
    """
    Generate JSON data with dynamic schema containing unique 'id' and random columns
    with all columns having string values (either dummy data or empty string).

    Args:
        sample_size (int): The number of JSON objects to generate. Default is 10.
        max_columns (int): The maximum number of dynamic columns to include. Default is 5.

    Returns:
        list: A list of JSON objects with dynamic schemas.
    """
    data = []

    for _ in range(sample_size):
        # Generate a unique 'id' using UUID4
        json_obj = {"id": str(uuid.uuid4())}  # 'id' is a unique UUID as a string

        # Generate a random number of additional columns (between 1 and max_columns)
        num_columns = random.randint(1, max_columns)

        for i in range(num_columns):
            col_name = f"col{i + 1}"
            # All columns have string values (either dummy data or an empty string)
            json_obj[col_name] = f"DummyData{i + 1}"

        data.append(json_obj)

    return data

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
def print_hudi_table(BUCKET):
    hudi_table_path = f"s3://{BUCKET}/silver/table_name=changing_json_schema/"

    spark.read.format("org.apache.hudi").load(hudi_table_path).createOrReplaceTempView("hudi_snapshot")

    spark.sql("select * from hudi_snapshot").printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Batch 1 

In [35]:
BUCKET = "datateam-sandbox-qa-demo"
max_col = 3
total_col = max_col  # Use a fixed number of columns
sample_data = generate_dynamic_json_data(sample_size=5, max_columns=total_col)
df = spark.createDataFrame(sample_data)
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+----------+--------------------+
|      col1|      col2|      col3|                  id|
+----------+----------+----------+--------------------+
|DummyData1|DummyData2|DummyData3|d1a49d14-fa74-4d1...|
|DummyData1|DummyData2|      null|1c4e6b27-1886-4b9...|
|DummyData1|      null|      null|6e39283e-db58-48c...|
|DummyData1|DummyData2|      null|6c3e9e94-1f65-4aa...|
|DummyData1|DummyData2|      null|477f9a99-a4b4-439...|
+----------+----------+----------+--------------------+

In [36]:
upsert_hudi_table(
    glue_database="hudidb",
    table_name="changing_json_schema",
    record_id="id",
    precomb_key="id",
    table_type='COPY_ON_WRITE',
    partition_fields="n/a",
    method='upsert',
    index_type='BLOOM',
    enable_partition=False,
    enable_cleaner=True,
    enable_hive_sync=True,
    enable_clustering='False',
    clustering_column='default',
    enable_meta_data_indexing='false',
    use_sql_transformer=False,
    sql_transformer_query='default',
    target_path=f"s3://{BUCKET}/silver/table_name=changing_json_schema/",
    spark_df=df,
)

print_hudi_table(BUCKET=BUCKET)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: string (nullable = true)
 |-- id: string (nullable = true)

# Batch 2  Different Schema 

In [37]:
BUCKET = "datateam-sandbox-qa-demo"
max_col = 6
total_col = max_col  # Use a fixed number of columns
sample_data = generate_dynamic_json_data(sample_size=5, max_columns=total_col)
df = spark.createDataFrame(sample_data)
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+----------+----------+----------+--------------------+
|      col1|      col2|      col3|      col4|      col5|                  id|
+----------+----------+----------+----------+----------+--------------------+
|DummyData1|DummyData2|DummyData3|DummyData4|DummyData5|1f16c4c6-4cb0-4fc...|
|DummyData1|DummyData2|DummyData3|DummyData4|DummyData5|aae887d7-7504-4a6...|
|DummyData1|DummyData2|DummyData3|      null|      null|b7dcb6eb-a989-456...|
|DummyData1|      null|      null|      null|      null|f0a163f6-4d2f-45e...|
|DummyData1|DummyData2|DummyData3|DummyData4|      null|adcfd4e2-ef9e-453...|
+----------+----------+----------+----------+----------+--------------------+

In [38]:
upsert_hudi_table(
    glue_database="hudidb",
    table_name="changing_json_schema",
    record_id="id",
    precomb_key="id",
    table_type='COPY_ON_WRITE',
    partition_fields="n/a",
    method='upsert',
    index_type='BLOOM',
    enable_partition=False,
    enable_cleaner=True,
    enable_hive_sync=True,
    enable_clustering='False',
    clustering_column='default',
    enable_meta_data_indexing='false',
    use_sql_transformer=False,
    sql_transformer_query='default',
    target_path=f"s3://{BUCKET}/silver/table_name=changing_json_schema/",
    spark_df=df,
)

print_hudi_table(BUCKET=BUCKET)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: string (nullable = true)
 |-- col4: string (nullable = true)
 |-- col5: string (nullable = true)
 |-- id: string (nullable = true)

# Running Alter Commands to Delete Columns 

In [25]:
spark.sql("show databases;").show()
spark.sql("use hudidb;")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+
|namespace|
+---------+
|  default|
|   hudidb|
+---------+

DataFrame[]

In [39]:
db_name = "hudidb"
table_name_test = "changing_json_schema"
query = f"ALTER  TABLE  {db_name}.{table_name_test} drop column col5"
spark.sql(query)
print(f"Query Executed  : {query}")

print_hudi_table(BUCKET=BUCKET)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Query Executed  : ALTER  TABLE  hudidb.changing_json_schema drop column col5
root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: string (nullable = true)
 |-- col4: string (nullable = true)
 |-- id: string (nullable = true)

# Renaming COlumns

In [40]:
db_name = "hudidb"
table_name_test = "changing_json_schema"
query = f"ALTER TABLE {db_name}.{table_name_test} RENAME COLUMN col1 TO mycol"
spark.sql(query)
print(f"Query Executed: {query}")

print_hudi_table(BUCKET=BUCKET)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Query Executed: ALTER TABLE hudidb.changing_json_schema RENAME COLUMN col1 TO mycol
root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- mycol: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: string (nullable = true)
 |-- col4: string (nullable = true)
 |-- id: string (nullable = true)