# init spark session

In [1]:
from IPython.core.display import HTML
display(HTML("""<style>pre { white-space: pre !important; }.container { width:100% !important; }</style>"""))

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("yarn")
    .appName("staging-to-rawvault-streaming")
    .config("spark.driver.memory", "1g")
    .config("hive.metastore.uris", "thrift://hive-metastore:9083")
    .config("spark.sql.warehouse.dir", "hdfs://yarn-master:9000/user/hive/warehouse/default")
    .config("spark.jars", """
        hdfs://yarn-master:9000/user/hive/spark_jars/iceberg-hive-runtime-1.4.3.jar,
        hdfs://yarn-master:9000/user/hive/spark_jars/iceberg-spark-runtime-3.4_2.12-1.4.3.jar
    """)
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
    .config("spark.executor.cores", 3)
    .config("spark.executor.memory", "6g")
    .config("spark.executor.instances", 1)
    .enableHiveSupport()
    .getOrCreate()
)

staging_path = "/user/hive/warehouse/staging"
data_source = "product1.public"

25/06/22 06:06:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/22 06:06:20 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [13]:
spark.sql("show tables from staging").show(10, False)

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|staging  |test_hive|false      |
+---------+---------+-----------+



In [14]:
spark.sql("show tables from rawvault").show(10, False)

+---------+----------------------------+-----------+
|namespace|tableName                   |isTemporary|
+---------+----------------------------+-----------+
|rawvault |product1_public_customer_der|false      |
|rawvault |test_iceberg                |false      |
+---------+----------------------------+-----------+



# create new staging tables

In [16]:
import re

pattern = re.compile(rf"{data_source}.*")
hadoop = spark.sparkContext._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration() 
path = hadoop.fs.Path(f"{staging_path}/topics")
all_paths = [str(f.getPath()).split("/")[-1] for f in fs.get(conf).listStatus(path)]
all_tables = [path for path in all_paths if pattern.match(path)]
print(f"{'#'*2} all_tables={all_tables}")
for table in all_tables:
    hdfs_path = f"hdfs://yarn-master:9000{staging_path}/topics/{table}"
    table_name = table.replace(".", "_")
    df = (
        spark.read.format('parquet')
        .options(header=True, inferSchema=True)
        .load(hdfs_path)
    )
    if spark.sql(f"show tables from staging like '{table_name}'").isEmpty():
        print(f"{'#'*2} create table staging.{table_name}")
        spark.sql(f"""
            create external table if not exists staging.{table_name}
            ({', '.join([col + ' ' + dtype for col, dtype in df.dtypes])})
            partitioned by (__ds)
            stored as parquet
            location "{hdfs_path}"
        """)
        spark.sql(f'repair table staging.{table_name}')

## all_tables=['product1.public.customer', 'product1.public.service']


In [30]:
spark.sql("select * from staging.product1_public_customer").show(10, False)

+-----------+----------+---------+----------+-----------------+------------+------------------+-----------------+-------------------+---------+----+--------+-----------------------+----------+
|customer_id|first_name|last_name|birth_date|address          |phone_number|email             |job_title        |updated_datetime   |__deleted|__op|__lsn   |__src_ts_ms            |__ds      |
+-----------+----------+---------+----------+-----------------+------------+------------------+-----------------+-------------------+---------+----+--------+-----------------------+----------+
|2          |The       |Duong    |2025-06-04|132 ABC, MNO, XYZ|0865937124  |theduong@gmail.com|Software Engineer|2025-06-22 00:01:40|false    |u   |27099384|2025-06-21 17:01:40.226|2025-06-22|
|2          |The       |Duong    |2025-06-05|132 ABC, MNO, XYZ|0865937124  |theduong@gmail.com|Software Engineer|2025-06-22 00:01:44|false    |u   |27101464|2025-06-21 17:01:44.327|2025-06-22|
|2          |The       |Duong    |2

# create new rawvault tables

In [3]:
table_name_pattern = data_source.replace(".", "_")
staging_table_list = (
    spark.sql(f"show tables from staging like '{table_name_pattern}_*'")
        .select("tableName")
        .rdd.flatMap(lambda x: x).collect()
)
rawvault_table_list = (
    spark.sql(f"show tables from rawvault like '{table_name_pattern}_*'")
        .select("tableName")
        .rdd.flatMap(lambda x: x).collect()
)
new_tables = list(set(staging_table_list) - set(rawvault_table_list))

                                                                                

In [32]:
for table in new_tables:
    tables = {"derived": f"{table}_der", "snapshot": f"{table}_snp", "main": table}
    for table_type, table_name in tables.items():
        print(f"{'#'*2} create table rawvault.{table_name}")
        df = spark.sql(f"select * from staging.{tables['main']}")
        spark.sql(f"""
        create external table if not exists rawvault.{table_name}
        ({', '.join([col + ' ' + dtype for col, dtype in df.dtypes])})
        using iceberg
        {'partitioned by (days(updated_datetime))' if table_type == 'main' else ''}
        location 'hdfs://yarn-master:9000/user/hive/warehouse/rawvault/{table_name}'
        tblproperties(
            'objcapabilities'='extread,extwrite',
            'engine.hive.enabled'='true',
            'write.delete.mode'='copy-on-write',
            'write.update.mode'='copy-on-write',
            'write.merge.mode'='copy-on-write',
            'external.table.purge'='true',
            'iceberg.file_format'='parquet',
            'format-version'='2',
            'read.parquet.vectorization.batch-size'='10000',
            'read.parquet.vectorization.enabled'='false'
        )
        """)

product1_public_service_der
product1_public_service_snp
product1_public_service
product1_public_customer_der
product1_public_customer_snp
product1_public_customer


# read and write stream

In [16]:
# stream_reader = (
#     spark.readStream
#     .schema(spark.sql(f"select * from staging.product1_public_customer").schema)
#     .parquet(f"hdfs://yarn-master:9000{staging_path}/topics/{staging_table.replace('_', '.')}")
# )
# stream_writer = (
#     stream_reader.writeStream
#         .outputMode("append").format("console")
#         .start()
# )
# stream_writer.awaitTermination()

In [None]:
for staging_table in staging_table_list:
    stream_reader = (
        spark.readStream
        .schema(spark.sql(f"select * from staging.{staging_table}").schema)
        .parquet(f"hdfs://yarn-master:9000{staging_path}/topics/{staging_table.replace('_', '.')}")
    )
    stream_writer = (
        stream_reader.writeStream
            .outputMode("append").format("console")
            .start()
    )
    # stream_query = (
    #     stream_reader.writeStream
    #     .outputMode("append").format("iceberg")
    #     .option("checkpointLocation", f"hdfs://yarn-master:9000{staging_path}/checkpoints/{staging_table}")
    #     .trigger(processingTime="10 seconds")
    #     .toTable(f"rawvault.{staging_table}_der")
    # )
spark.streams.awaitAnyTermination()

25/06/22 06:03:46 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e65d64f0-03c8-4d8a-b27b-6719adacde6e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/06/22 06:03:46 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/06/22 06:03:46 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ffdd45c4-7d43-49b6-880e-281a1ebe387a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/06/22 06:03:46 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not support

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+-----------------+---------+--------------+-----------+-------------+-------------------+---------+----+--------+--------------------+----------+
|service_id|             name|    price|         image|active_date|inactive_date|   updated_datetime|__deleted|__op|   __lsn|         __src_ts_ms|      __ds|
+----------+-----------------+---------+--------------+-----------+-------------+-------------------+---------+----+--------+--------------------+----------+
|         1|    House Keeping|123.45600|        string| 2024-08-17|   9999-01-01|2024-08-17 14:43:01|    false|   r|27105864|2025-06-22 03:43:...|2025-06-22|
|         2| Office for lease|301.00000|        string| 2024-08-17|   9999-01-01|2025-06-19 14:16:33|    false|   r|27105864|2025-06-22 03:43:...|2025-06-22|
|        10|Electrocity check|300.00000|abc/xyz/ec.png| 2025-06-18|   2027-06-18|2025-06-19 14:25:15|    false|  

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+----------+----------+----------+-----------------+------------+------------------+-----------------+-------------------+---------+----+--------+--------------------+----------+
|customer_id|first_name| last_name|birth_date|          address|phone_number|             email|        job_title|   updated_datetime|__deleted|__op|   __lsn|         __src_ts_ms|      __ds|
+-----------+----------+----------+----------+-----------------+------------+------------------+-----------------+-------------------+---------+----+--------+--------------------+----------+
|          3|      Dien|        Vo|2001-07-07|133 ABC, MNO, XYZ|  0865937134|  dienvo@gmail.com|    Data Engineer|2024-08-17 16:00:42|    false|   r|27105864|2025-06-22 03:43:...|2025-06-22|
|          6|     Cuong|        Vo|2001-07-07| 12 ABC, MNO, XYZ|      012356|cuongvo2@gmail.com|    Data Engineer|2024-08-17 16:23:58|    f

In [3]:
df = (
    spark.read.format('parquet')
    .options(header=True, inferSchema=True)
    .load("hdfs://yarn-master:9000/user/hive/warehouse/staging/topics/product1.public.customer")
)
df.show(10, False)

[Stage 1:>                                                          (0 + 1) / 1]

+-----------+----------+----------+----------+-----------------+------------+------------------+-----------------+-------------------+---------+----+--------+----------------------+----------+
|customer_id|first_name|last_name |birth_date|address          |phone_number|email             |job_title        |updated_datetime   |__deleted|__op|__lsn   |__src_ts_ms           |__ds      |
+-----------+----------+----------+----------+-----------------+------------+------------------+-----------------+-------------------+---------+----+--------+----------------------+----------+
|3          |Dien      |Vo        |2001-07-07|133 ABC, MNO, XYZ|0865937134  |dienvo@gmail.com  |Data Engineer    |2024-08-17 16:00:42|false    |r   |27105864|2025-06-22 03:43:09.75|2025-06-22|
|6          |Cuong     |Vo        |2001-07-07|12 ABC, MNO, XYZ |012356      |cuongvo2@gmail.com|Data Engineer    |2024-08-17 16:23:58|false    |r   |27105864|2025-06-22 03:43:09.75|2025-06-22|
|4          |Cuong     |Vo Quoccc |

                                                                                

In [None]:
spark.sql("select * from rawvault.product1_public_customer_der").show(10, False)