In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
jars_dir = "jars"
jars = [
    os.path.join(jars_dir, "iceberg-spark-runtime-3.5_2.12-1.5.0.jar"),
    os.path.join(jars_dir, "hadoop-aws-3.3.4.jar"),
    os.path.join(jars_dir, "aws-java-sdk-bundle-1.12.262.jar")
]

data_dir = "../data"
file_path = os.path.join(data_dir, "company.csv")

In [3]:
spark = SparkSession.builder \
    .appName("LoadFileToIceberg") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hive") \
    .config("spark.sql.catalog.iceberg.uri", "thrift://localhost:9083") \
    .config("spark.sql.catalog.iceberg.warehouse", "s3a://lakehouse") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minio") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.jars", ",".join(jars)) \
    .getOrCreate()

In [4]:
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.show(5)

+------+--------------------+---------+---------+---------+---------+-----------+
|symbol|          organ_name|icb_code1|icb_code2|icb_code3|icb_code4|issue_share|
+------+--------------------+---------+---------+---------+---------+-----------+
|   ACB|Ngân hàng Thương ...|     8301|     8300|     8350|     8355| 5136656599|
|   BCM|Tổng Công ty Đầu ...|     8000|     8600|     8630|     8633| 1035000000|
|   BID|Ngân hàng Thương ...|     8301|     8300|     8350|     8355| 7021361917|
|   CTG|Ngân hàng Thương ...|     8301|     8300|     8350|     8355| 5369991748|
|   DGC|Công ty Cổ phần T...|     1000|     1300|     1350|     1357|  379779286|
+------+--------------------+---------+---------+---------+---------+-----------+
only showing top 5 rows



In [5]:
df.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- organ_name: string (nullable = true)
 |-- icb_code1: integer (nullable = true)
 |-- icb_code2: integer (nullable = true)
 |-- icb_code3: integer (nullable = true)
 |-- icb_code4: integer (nullable = true)
 |-- issue_share: long (nullable = true)



In [6]:
spark.sql("CREATE SCHEMA IF NOT EXISTS iceberg.demo_db")
spark.sql("DROP TABLE IF EXISTS iceberg.demo_db.raw_company")

DataFrame[]

In [7]:
# spark.sql("""
# CREATE EXTERNAL TABLE IF NOT EXISTS iceberg.demo_db.raw_company (
#     symbol STRING,
#     organ_name STRING,
#     icb_code1 INT,
#     icb_code2 INT,
#     icb_code3 INT,
#     icb_code4 INT,
#     issue_share BIGINT
# )
# USING iceberg
# """)

In [13]:
# saveAsTable() method automatically creates table if not exist
df.write.format("iceberg") \
    .mode("append") \
    .saveAsTable("iceberg.demo_db.raw_company")

In [9]:
# table must exists first
df.writeTo("iceberg.demo_db.raw_company").append()

In [14]:
df_raw_company = spark.read.table("iceberg.demo_db.raw_company")
df_raw_company.count()

90

In [15]:
spark.sql("SELECT * FROM iceberg.demo_db.raw_company.snapshots").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-08-28 17:20:...|7603303508460291768|               NULL|   append|s3a://lakehouse/d...|{spark.app.id -> ...|
|2025-08-28 17:20:...|2056573517944637087|7603303508460291768|   append|s3a://lakehouse/d...|{spark.app.id -> ...|
|2025-08-28 17:21:...|5508920788623149749|2056573517944637087|   append|s3a://lakehouse/d...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [12]:
spark.sql("DESCRIBE EXTENDED iceberg.demo_db.raw_company").show(100, False)

+----------------------------+----------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                             |comment|
+----------------------------+----------------------------------------------------------------------------------------------------------------------+-------+
|symbol                      |string                                                                                                                |NULL   |
|organ_name                  |string                                                                                                                |NULL   |
|icb_code1                   |int                                                                                                                   |NULL   |
|icb_code2                   |int                   