In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
def main():
   # Khởi SparkSession (nạp spark-defaults.conf và hive-site.xml)
   spark = SparkSession.builder \
       .appName("IcebergHiveCatalogExample") \
       .master("spark://spark-master:7077") \
       .enableHiveSupport() \
       .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
       .config("spark.sql.catalog.spark_catalog.type", "hive") \
       .config("spark.sql.catalog.spark_catalog.uri", "thrift://hive-metastore:9083") \
       .config("spark.jars", "/home/jovyan/jars/iceberg-spark-runtime-3.5_2.12-1.9.2.jar") \
       .getOrCreate()
   
   # Tạo DataFrame mẫu
   schema = StructType([
       StructField("id",   IntegerType(), False),
       StructField("name", StringType(),  False),
       StructField("age",  IntegerType(), True)
   ])
   data = [
       (1, "Nguyễn Văn A", 30),
       (2, "Trần Thị B",   25),
       (3, "Lê Văn C",     28),
       (4, "Nguyễn Lương Hoàng Tùng", 21)
   ]
   df = spark.createDataFrame(data, schema)
   
   print("=== Dữ liệu mẫu ===")
   df.show(truncate=False)
   
   # Tạo namespace (database) nếu chưa có
   spark.sql("CREATE NAMESPACE IF NOT EXISTS hive_catalog.default")
   
   # Tạo table Iceberg (nếu chưa tồn tại)
   spark.sql("""
     CREATE TABLE IF NOT EXISTS hive_catalog.default.users (
       id   INT,
       name STRING,
       age  INT
     ) USING iceberg
   """)
   
   # Ghi dữ liệu vào bảng (append)
   df.writeTo("hive_catalog.default.users").append()
   print(">>> Đã ghi dữ liệu vào hive_catalog.default.users")
   
   # Đọc lại và hiển thị
   spark.table("hive_catalog.default.users").show(truncate=False)
   print("=== Dữ liệu trong bảng Iceberg ===")
   
   # Hiển thị lịch sử commit, sắp xếp theo made_current_at DESC
   hist_df = spark.table("hive_catalog.default.users.history") \
                 .orderBy(col("made_current_at").desc())
   hist_df.show(10, truncate=False)
   print("=== Lịch sử commit của bảng (mới nhất trước) ===")
   
   # Xem danh sách snapshots
   snap_df = spark.table("hive_catalog.default.users.snapshots")
   snap_df.show(truncate=True)
   print("=== Danh sách snapshot hiện tại ===")
   
   spark.stop()

if __name__ == "__main__":
   main()

25/07/31 08:27:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


=== Dữ liệu mẫu ===


                                                                                

+---+-----------------------+---+
|id |name                   |age|
+---+-----------------------+---+
|1  |Nguyễn Văn A           |30 |
|2  |Trần Thị B             |25 |
|3  |Lê Văn C               |28 |
|4  |Nguyễn Lương Hoàng Tùng|21 |
+---+-----------------------+---+



                                                                                

>>> Đã ghi dữ liệu vào hive_catalog.default.users
+---+-----------------------+---+
|id |name                   |age|
+---+-----------------------+---+
|1  |Nguyễn Văn A           |30 |
|2  |Trần Thị B             |25 |
|1  |Nguyễn Văn A           |30 |
|3  |Lê Văn C               |28 |
|1  |Nguyễn Văn A           |30 |
|2  |Trần Thị B             |25 |
|4  |Nguyễn Lương Hoàng Tùng|21 |
|2  |Trần Thị B             |25 |
|3  |Lê Văn C               |28 |
|3  |Lê Văn C               |28 |
|4  |Nguyễn Lương Hoàng Tùng|21 |
|4  |Nguyễn Lương Hoàng Tùng|21 |
+---+-----------------------+---+

=== Dữ liệu trong bảng Iceberg ===
+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2025-07-31 08:27:39.371|6036102057306511325|659609171530464219 |true               |
|2025-07-31 08:23:24.622|6

In [10]:
spark.sparkContext.master

'spark://spark-master:7077'

In [1]:
from pyspark.sql import SparkSession

def main():
   spark = (
       SparkSession.builder
           .appName("IcebergTimeTravelExample") \
           .master("spark://spark-master:7077") \
           .enableHiveSupport() \
           .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
           .config("spark.sql.catalog.spark_catalog.type", "hive") \
           .config("spark.sql.catalog.spark_catalog.uri", "thrift://hive-metastore:9083") \
           .getOrCreate()
   )
   
   catalog_table = "hive_catalog.default.users"
   
   # Liệt kê các snapshot hiện có
   snapshots_df = spark.sql(f"SELECT snapshot_id, committed_at, summary FROM {catalog_table}.snapshots")
   snapshots_df.orderBy("committed_at").show(truncate=True)
   print("=== Danh sách snapshots ===")
   
   # Giả sử chúng ta lấy snapshot đầu tiên (cũ nhất) để time-travel:
   first_snapshot_id = snapshots_df.orderBy("committed_at").first()["snapshot_id"]
   print(f">>> Sẽ time-travel về snapshot_id = {first_snapshot_id}")
   
   # Đọc dữ liệu tại snapshot đó (version-as-of)
   df_time_travel = spark.read \
       .format("iceberg") \
       .option("snapshot-id", first_snapshot_id) \
       .load(catalog_table)
   df_time_travel.show(truncate=False)
   print(f"=== Dữ liệu tại snapshot {first_snapshot_id} ===")
   
   # Hoặc dùng SQL cú pháp VERSION AS OF
   df_sql = spark.sql(f"SELECT * FROM {catalog_table} VERSION AS OF {first_snapshot_id}")
   df_sql.show(truncate=False)
   print(f"=== (SQL) Dữ liệu tại snapshot {first_snapshot_id} ===")
   
   # Time-travel theo timestamp (ví dụ 5 phút trước)
   import datetime, pytz
   ts = (datetime.datetime.now(pytz.UTC) - datetime.timedelta(minutes=5)).strftime("%Y-%m-%d %H:%M:%S")
   print(f">>> Sẽ time-travel theo timestamp = {ts}")
   df_ts = spark.read \
       .format("iceberg") \
       .option("timestamp-as-of", ts) \
       .load(catalog_table)
   df_ts.show(truncate=False)
   print(f"=== Dữ liệu tại timestamp {ts} ===")
   
   spark.stop()

if __name__ == "__main__":
   main()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/31 08:34:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/07/31 08:34:36 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


NameError: name 'T' is not defined