In [None]:
# Display metadata
import os
import glob
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

pd.set_option('display.max_colwidth', None)
non_partitioned_table_path = '/home/iceberg/warehouse/transformed/iceberg_demo'
partitioned_table_path = '/home/iceberg/warehouse/transformed/iceberg_partitioned_demo'


catalog_name = "iceberg_catalog"
database_name = "transformed"
table_name = "iceberg_demo"
table_name_partitioned = "iceberg_partitioned_demo"

def display_metadata(demo_tbl_df, table_path):

    
    metadata_files_path = os.path.join(table_path, 'metadata/*.json')
    meta_data_files = sorted(glob.iglob(metadata_files_path), key=os.path.getctime, reverse=True)

    data_files_path = os.path.join(table_path, 'data/*')
    data_files = sorted(glob.iglob(data_files_path), key=os.path.getctime, reverse=True)

    print("---------------- Data Files List -----------------------")
    data_files_df = spark.createDataFrame(data_files, StringType(), )
    display(data_files_df.toPandas())

    print("---------------- Meta Data Files List -----------------------")
    metadata_files_df = spark.createDataFrame(meta_data_files, StringType())
    display(metadata_files_df.toPandas())

    print("---------------- Meta Data Json -----------------------")
    latest_metadata_file = meta_data_files[0]
    metadata_df = spark.read.format("json").option("multiLine", True).load(latest_metadata_file).select(
        F.col("current-schema-id"),
        F.col("current-snapshot-id"),
        F.col("current-schema-id"),
        F.col("partition-spec"),
        F.col("schemas"),
        F.explode(F.col("snapshots")).alias("snapshot")
    ).select(
        F.col("current-schema-id"),
        F.col("current-snapshot-id"),
        F.col("current-schema-id"),
        F.col("partition-spec"),
        F.col("snapshot.snapshot-id"),
        F.col("snapshot.manifest-list"),
        F.explode(F.col("schemas")).alias("schema")
    ).select(
        F.col("current-snapshot-id"),
        F.col("snapshot-id"),
        F.col("partition-spec"),
        F.col("current-schema-id"),
        F.col("schema.schema-id"),
        F.col("schema.fields").alias("schema"),
        F.col("manifest-list")
    ).filter(F.col("current-schema-id") == F.col("schema-id")).filter(F.col("current-snapshot-id") == F.col("snapshot-id"))
    #metadata_df.printSchema()
    display(metadata_df.toPandas())

    print("---------------- Manifest List Avro -----------------------")
    manifest_list_files = list(metadata_df.select(F.col("manifest-list")).toPandas()['manifest-list'])
    print(manifest_list_files)
    manifest_list_df = spark.read.format("avro").load(manifest_list_files).select(
        F.col("added_snapshot_id"),
        F.col("added_data_files_count"),
        F.col("existing_data_files_count"),
        F.col("deleted_data_files_count"),
        F.col("added_rows_count"),
        F.col("deleted_rows_count"),
        F.col("manifest_path")
    )
    display(manifest_list_df.toPandas())

    print("---------------- Manifest Avro -----------------------")
    manifest_files = list(manifest_list_df.select(F.col("manifest_path")).toPandas()['manifest_path'])
    print(manifest_list_files)
    manifest_df = spark.read.format("avro").load(manifest_files).select(
        F.col("*")
    )
    display(manifest_df.toPandas())




In [6]:
display(spark.sql("show catalogs").toPandas())
spark.sql("CREATE DATABASE transformed")
display(spark.sql("show databases in iceberg_catalog").toPandas())

Unnamed: 0,catalog
0,iceberg_catalog


Unnamed: 0,namespace
0,transformed


In [2]:
# Create Demo Iceberg Table
fact_sales_df = spark.read.format("parquet").load("/home/datalake/transformed.fact_sales.parquet/")
fact_sales_df.count()
spark.sql(f"DROP TABLE IF EXISTS {catalog_name}.{database_name}.{table_name}")
os.system(f"rm -rf {non_partitioned_table_path}")

fact_sales_df.writeTo(f"{catalog_name}.{database_name}.{table_name}").create()
print("Table created.")

                                                                                

Table created.


In [None]:
# Read Iceberg Table and Display Metadata
demo_iceberg_df = spark.table(f"{catalog_name}.{database_name}.{table_name}")
display_metadata(demo_tbl_df=demo_iceberg_df, table_path=non_partitioned_table_path)

In [None]:
# Update a record in Iceberg Table
display(demo_iceberg_df.filter("order_hash_key = '733398b50242a8734489a906a12a793f' and order_item_id = '1'").withColumn("file_name", F.input_file_name()).toPandas())
iceberg_update_output = spark.sql(f"""
UPDATE {database_name}.{table_name}
SET shipping_country = 'USA'
WHERE order_hash_key = '733398b50242a8734489a906a12a793f' and order_item_id = '1'
""")
spark.catalog.clearCache()
display(demo_iceberg_df.filter("order_hash_key = '733398b50242a8734489a906a12a793f' and order_item_id = '1'").withColumn("file_name", F.input_file_name()).toPandas())

In [None]:
display_metadata(demo_tbl_df=demo_iceberg_df, table_path=non_partitioned_table_path)

In [None]:
# Schema Evolution - DELETE a recond
spark.sql(f"""DELETE FROM {catalog_name}.{database_name}.{table_name} WHERE order_hash_key = '733398b50242a8734489a906a12a793f' and order_item_id = '1'""")
display_metadata(demo_tbl_df=demo_iceberg_df, table_path=non_partitioned_table_path)

In [None]:
# Schema Evolution - ADD a Column
display(spark.sql(f"DESCRIBE {catalog_name}.{database_name}.{table_name}").toPandas())
spark.sql(f"""ALTER TABLE {catalog_name}.{database_name}.{table_name} ADD  COLUMNS (
                shipping_city string
                )""")
display(spark.sql(f"DESCRIBE {catalog_name}.{database_name}.{table_name}").toPandas())
display_metadata(demo_tbl_df=demo_iceberg_df, table_path=non_partitioned_table_path)

In [None]:
# Schema Evolution - Remove a Column
display(spark.sql(f"DESCRIBE {catalog_name}.{database_name}.{table_name}").toPandas())
spark.sql(f"""ALTER TABLE {catalog_name}.{database_name}.{table_name} DROP COLUMN ctrl_load_date""")
display(spark.sql(f"DESCRIBE {catalog_name}.{database_name}.{table_name}").toPandas())
display_metadata(demo_tbl_df=demo_iceberg_df, table_path=non_partitioned_table_path)

In [None]:
# Schema Evolution - Rename a Column
display(spark.sql(f"DESCRIBE {catalog_name}.{database_name}.{table_name}").toPandas())
spark.sql(f"""ALTER TABLE {catalog_name}.{database_name}.{table_name} RENAME COLUMN subtotal TO sub_total""")
display(spark.sql(f"DESCRIBE {catalog_name}.{database_name}.{table_name}").toPandas())
display_metadata(demo_tbl_df=demo_iceberg_df, table_path=non_partitioned_table_path)

In [None]:
spark.catalog.clearCache()
demo_iceberg_final = spark.table(f"{catalog_name}.{database_name}.{table_name}")
display(demo_iceberg_final.limit(5).toPandas())

In [None]:
# Create Partitioned Iceberg Table

fact_sales_df = spark.read.format("parquet").load("/home/datalake/transformed.fact_sales.parquet/")
fact_sales_df.count()
spark.sql(f"DROP TABLE IF EXISTS {catalog_name}.{database_name}.{table_name_partitioned}")
os.system(f"rm -rf {partitioned_table_path}")

fact_sales_df.sortWithinPartitions("ctrl_load_date").writeTo(f"{catalog_name}.{database_name}.{table_name_partitioned}").partitionedBy(F.days("ctrl_load_date")).createOrReplace()
print("Table created.")
display_metadata(demo_tbl_df=demo_iceberg_df, table_path=partitioned_table_path)

In [None]:
display(spark.sql(f"SELECT * FROM {catalog_name}.{database_name}.{table_name_partitioned} LIMIT 10").show())

In [None]:
demo_icerberg_rewrite_table = spark.table(f"{catalog_name}.{database_name}.{table_name}")
SparkActions
    .get()
    .rewriteDataFiles(table)
    .option("target-file-size-bytes", Long.toString(500 * 1024 * 1024))
    .execute();

spark.table(f"{catalog_name}.{database_name}.{table_name}")