### Setting up the Environment
We begin by loading the utils.py file, which contains the necessary imports and functions to start a SparkSession.

In [1]:
%run utils.py

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/10 02:36:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/02/10 02:36:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


SparkSession started with app name: Hudi-Notebooks


In [2]:
spark = get_spark_session("Mock data")

SparkSession started with app name: Mock data


In [3]:
# spark.sql("USE warehouse") 
# spark.sql("DROP TABLE hudi.warehouse.vn30")

In [4]:
df_vn30 = spark.read.option("multiLine", "true").json("s3a://raw/vn30.json")
df_vn30 = rename_df_columns(df_vn30)

table_vn30 = "vn30"
base_path_vn30 = f"s3a://warehouse"

hudi_conf_vn30 = {
    "hoodie.table.name": table_vn30, # The name of our Hudi table.
    "hoodie.database.name": "warehouse",
    "hoodie.datasource.write.recordkey.field": "stock_symbol", # The column that acts as the unique identifier for each record.
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE", # Hudi uses Copy-on-Write as the default table type, but we are being explicit here.
    # "hoodie.datasource.write.partitionpath.field": "city", # The column Hudi uses to partition the data on storage.
    # "hoodie.datasource.write.precombine.field": "ts", # The field used to deduplicate records when a conflict occurs.
    # "hoodie.table.cdc.enabled": "true",
    # "hoodie.datasource.write.hive_style_partitioning": "true", # This ensures partition directories are named like `city=new_york`.
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.jdbcurl": "thrift://hive-metastore:9083",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.support_timestamp": "true",
    "hoodie.schema.on.read.enable": "true",
}

# Write the DataFrame to a Hudi COW table
# The default operation is "upsert" if this is not specified.
df_vn30.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**hudi_conf_vn30) \
    .mode("overwrite") \
    .save(f"{base_path_vn30}/{table_vn30}")

                                                                                



In [5]:
# df_corporate_action = spark.read.option("multiLine", "true").json("s3a://raw/corporate-actions.json")
# df_corporate_action = rename_df_columns(df_corporate_action)

# table_corporate_action = "corporate_action"
# base_path_corporate_action = f"s3a://warehouse"

# hudi_conf_corporate_action = {
#     "hoodie.table.name": table_corporate_action, # The name of our Hudi table.
#     "hoodie.database.name": "warehouse",
#     "hoodie.datasource.write.recordkey.field": "symbol", # The column that acts as the unique identifier for each record.
#     "hoodie.datasource.write.table.type": "COPY_ON_WRITE", # Hudi uses Copy-on-Write as the default table type, but we are being explicit here.
#     # "hoodie.datasource.write.partitionpath.field": "city", # The column Hudi uses to partition the data on storage.
#     # "hoodie.datasource.write.precombine.field": "ts", # The field used to deduplicate records when a conflict occurs.
#     # "hoodie.table.cdc.enabled": "true",
#     # "hoodie.datasource.write.hive_style_partitioning": "true", # This ensures partition directories are named like `city=new_york`.
#     "hoodie.datasource.hive_sync.mode": "hms",
#     "hoodie.datasource.hive_sync.jdbcurl": "thrift://hive-metastore:9083",
#     "hoodie.datasource.hive_sync.enable": "true",
#     "hoodie.datasource.hive_sync.support_timestamp": "true",
#     "hoodie.schema.on.read.enable": "true",
# }

# # Write the DataFrame to a Hudi COW table
# # The default operation is "upsert" if this is not specified.
# df_corporate_action.write \
#     .format("hudi") \
#     .option("hoodie.datasource.write.operation", "upsert") \
#     .options(**hudi_conf_corporate_action) \
#     .mode("overwrite") \
#     .save(f"{base_path_corporate_action}/{table_corporate_action}")

In [6]:
df_industry = spark.read.option("multiLine", "true").json("s3a://raw/industry.json")
df_industry = rename_df_columns(df_industry)

table_industry = "industry"
base_path_industry = f"s3a://warehouse"

hudi_conf_industry = {
    "hoodie.table.name": table_industry, # The name of our Hudi table.
    "hoodie.database.name": "warehouse",
    "hoodie.datasource.write.recordkey.field": "icb_code", # The column that acts as the unique identifier for each record.
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE", # Hudi uses Copy-on-Write as the default table type, but we are being explicit here.
    # "hoodie.datasource.write.partitionpath.field": "city", # The column Hudi uses to partition the data on storage.
    # "hoodie.datasource.write.precombine.field": "ts", # The field used to deduplicate records when a conflict occurs.
    # "hoodie.table.cdc.enabled": "true",
    # "hoodie.datasource.write.hive_style_partitioning": "true", # This ensures partition directories are named like `city=new_york`.
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.jdbcurl": "thrift://hive-metastore:9083",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.support_timestamp": "true",
    "hoodie.schema.on.read.enable": "true",
}

# Write the DataFrame to a Hudi COW table
# The default operation is "upsert" if this is not specified.
df_industry.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**hudi_conf_industry) \
    .mode("overwrite") \
    .save(f"{base_path_industry}/{table_industry}")

In [7]:
# df_stock_info = spark.read.option("multiLine", "true").json("s3a://raw/stock-info.json")
# df_stock_info = rename_df_columns(df_stock_info)

# table_stock_info = "stock_info"
# base_path_stock_info = f"s3a://warehouse"

# hudi_conf_stock_info = {
#     "hoodie.table.name": table_stock_info, # The name of our Hudi table.
#     "hoodie.database.name": "warehouse",
#     "hoodie.datasource.write.recordkey.field": "symbol", # The column that acts as the unique identifier for each record.
#     "hoodie.datasource.write.table.type": "COPY_ON_WRITE", # Hudi uses Copy-on-Write as the default table type, but we are being explicit here.
#     # "hoodie.datasource.write.partitionpath.field": "city", # The column Hudi uses to partition the data on storage.
#     # "hoodie.datasource.write.precombine.field": "ts", # The field used to deduplicate records when a conflict occurs.
#     # "hoodie.table.cdc.enabled": "true",
#     # "hoodie.datasource.write.hive_style_partitioning": "true", # This ensures partition directories are named like `city=new_york`.
#     "hoodie.datasource.hive_sync.mode": "hms",
#     "hoodie.datasource.hive_sync.jdbcurl": "thrift://hive-metastore:9083",
#     "hoodie.datasource.hive_sync.enable": "true",
#     "hoodie.datasource.hive_sync.support_timestamp": "true",
#     "hoodie.schema.on.read.enable": "true",
# }

# # Write the DataFrame to a Hudi COW table
# # The default operation is "upsert" if this is not specified.
# df_stock_info.write \
#     .format("hudi") \
#     .option("hoodie.datasource.write.operation", "upsert") \
#     .options(**hudi_conf_stock_info) \
#     .mode("overwrite") \
#     .save(f"{base_path_stock_info}/{table_stock_info}")

In [8]:
df_company_basic = spark.read.option("multiLine", "true").json("s3a://raw/company-basic.json")
df_company_basic = rename_df_columns(df_company_basic)

table_company_basic = "company_basic"
base_path_company_basic = f"s3a://warehouse"

hudi_conf_company_basic = {
    "hoodie.table.name": table_company_basic, # The name of our Hudi table.
    "hoodie.database.name": "warehouse",
    "hoodie.datasource.write.recordkey.field": "ticker", # The column that acts as the unique identifier for each record.
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE", # Hudi uses Copy-on-Write as the default table type, but we are being explicit here.
    # "hoodie.datasource.write.partitionpath.field": "city", # The column Hudi uses to partition the data on storage.
    # "hoodie.datasource.write.precombine.field": "ts", # The field used to deduplicate records when a conflict occurs.
    # "hoodie.table.cdc.enabled": "true",
    # "hoodie.datasource.write.hive_style_partitioning": "true", # This ensures partition directories are named like `city=new_york`.
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.jdbcurl": "thrift://hive-metastore:9083",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.support_timestamp": "true",
    "hoodie.schema.on.read.enable": "true",
}

# Write the DataFrame to a Hudi COW table
# The default operation is "upsert" if this is not specified.
df_company_basic.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**hudi_conf_company_basic) \
    .mode("overwrite") \
    .save(f"{base_path_company_basic}/{table_company_basic}")

In [9]:
df_news = spark.read.option("multiLine", "true").json("s3a://raw/news.json")
df_news = rename_df_columns(df_news)

table_news = "news"
base_path_news = f"s3a://warehouse"

hudi_conf_news = {
    "hoodie.table.name": table_news, # The name of our Hudi table.
    "hoodie.database.name": "warehouse",
    "hoodie.datasource.write.recordkey.field": "ticker", # The column that acts as the unique identifier for each record.
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE", # Hudi uses Copy-on-Write as the default table type, but we are being explicit here.
    # "hoodie.datasource.write.partitionpath.field": "city", # The column Hudi uses to partition the data on storage.
    # "hoodie.datasource.write.precombine.field": "ts", # The field used to deduplicate records when a conflict occurs.
    # "hoodie.table.cdc.enabled": "true",
    # "hoodie.datasource.write.hive_style_partitioning": "true", # This ensures partition directories are named like `city=new_york`.
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.jdbcurl": "thrift://hive-metastore:9083",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.support_timestamp": "true",
    "hoodie.schema.on.read.enable": "true",
}

# Write the DataFrame to a Hudi COW table
# The default operation is "upsert" if this is not specified.
df_news.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**hudi_conf_news) \
    .mode("overwrite") \
    .save(f"{base_path_news}/{table_news}")

In [10]:
df_stock_fund = spark.read.option("multiLine", "true").json("s3a://raw/stocks-fund.json")
df_stock_fund = rename_df_columns(df_stock_fund)

table_stock_fund = "stock_fund"
base_path_stock_fund = f"s3a://warehouse"

hudi_conf_stock_fund = {
    "hoodie.table.name": table_stock_fund, # The name of our Hudi table.
    "hoodie.database.name": "warehouse",
    "hoodie.datasource.write.recordkey.field": "symbol", # The column that acts as the unique identifier for each record.
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE", # Hudi uses Copy-on-Write as the default table type, but we are being explicit here.
    # "hoodie.datasource.write.partitionpath.field": "city", # The column Hudi uses to partition the data on storage.
    # "hoodie.datasource.write.precombine.field": "ts", # The field used to deduplicate records when a conflict occurs.
    # "hoodie.table.cdc.enabled": "true",
    # "hoodie.datasource.write.hive_style_partitioning": "true", # This ensures partition directories are named like `city=stock_fund_york`.
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.jdbcurl": "thrift://hive-metastore:9083",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.support_timestamp": "true",
    "hoodie.schema.on.read.enable": "true",
}

# Write the DataFrame to a Hudi COW table
# The default operation is "upsert" if this is not specified.
df_stock_fund.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**hudi_conf_stock_fund) \
    .mode("overwrite") \
    .save(f"{base_path_stock_fund}/{table_stock_fund}")

In [11]:
df_event = spark.read.option("multiLine", "true").json("s3a://raw/event.json")
df_event = rename_df_columns(df_event)

table_event = "event"
base_path_event = f"s3a://warehouse"

hudi_conf_event = {
    "hoodie.table.name": table_event, # The name of our Hudi table.
    "hoodie.database.name": "warehouse",
    "hoodie.datasource.write.recordkey.field": "id, symbol", # The column that acts as the unique identifier for each record.
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE", # Hudi uses Copy-on-Write as the default table type, but we are being explicit here.
    # "hoodie.datasource.write.partitionpath.field": "city", # The column Hudi uses to partition the data on storage.
    # "hoodie.datasource.write.precombine.field": "ts", # The field used to deduplicate records when a conflict occurs.
    # "hoodie.table.cdc.enabled": "true",
    # "hoodie.datasource.write.hive_style_partitioning": "true", # This ensures partition directories are named like `city=event_york`.
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.jdbcurl": "thrift://hive-metastore:9083",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.support_timestamp": "true",
    "hoodie.schema.on.read.enable": "true",
}

# Write the DataFrame to a Hudi COW table
# The default operation is "upsert" if this is not specified.
df_event.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**hudi_conf_event) \
    .mode("overwrite") \
    .save(f"{base_path_event}/{table_event}")

In [12]:
spark.stop()