## Introduction to Iceberg Architecture

In [1]:
!pip3 install -r requirements.txt

Collecting cmlbootstrap
  Cloning https://github.com/fastforwardlabs/cmlbootstrap to /tmp/pip-install-493gqeea/cmlbootstrap_3b27dacd91bc4abc9cf0f1639cc4229b
  Running command git clone -q https://github.com/fastforwardlabs/cmlbootstrap /tmp/pip-install-493gqeea/cmlbootstrap_3b27dacd91bc4abc9cf0f1639cc4229b


#### Launching a Spark Session with Iceberg

In [2]:
import cml.data_v1 as cmldata

CONNECTION_NAME = "go01-aw-dl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

# Sample usage to run query through spark
EXAMPLE_SQL_QUERY = "show databases"
spark.sql(EXAMPLE_SQL_QUERY).show()

23/09/04 22:19:43 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
23/09/04 22:19:43 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
Setting spark.hadoop.yarn.resourcemanager.principal to pauldefusco
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/04 22:19:44 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
23/09/04 

+--------------------+
|           namespace|
+--------------------+
|         01_car_data|
|           01_car_dw|
|              adb101|
|            airlines|
|        airlines_csv|
|    airlines_iceberg|
|airlines_iceberg_...|
|      airlines_mjain|
|          airquality|
|                ajvp|
|          atlas_demo|
|            bankdemo|
|          bca_jps_l0|
|        cde_workshop|
|             cdedemo|
|        cdp_overview|
|      ceht_open_data|
|        ceht_scratch|
| ceht_transportation|
|        cgsifacebook|
+--------------------+
only showing top 20 rows



In [3]:
spark.sparkContext.getConf().getAll()

23/09/04 22:19:56 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
23/09/04 22:19:56 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.


[('spark.eventLog.enabled', 'true'),
 ('spark.network.crypto.enabled', 'true'),
 ('spark.sql.hive.hwc.execution.mode', 'spark'),
 ('spark.driver.port', '32809'),
 ('spark.jars',
  '/opt/spark/optional-lib/hive-warehouse-connector-assembly.jar,/opt/spark/optional-lib/iceberg-hive-runtime.jar,/opt/spark/optional-lib/iceberg-spark-runtime.jar'),
 ('spark.kerberos.renewal.credentials', 'ccache'),
 ('spark.sql.catalog.spark_catalog',
  'org.apache.iceberg.spark.SparkSessionCatalog'),
 ('spark.dynamicAllocation.maxExecutors', '49'),
 ('spark.eventLog.dir', 'file:///sparkeventlogs'),
 ('spark.hadoop.yarn.resourcemanager.principal', 'pauldefusco'),
 ('spark.kubernetes.driver.annotation.cluster-autoscaler.kubernetes.io/safe-to-evict',
  'false'),
 ('spark.ui.port', '20049'),
 ('spark.yarn.access.hadoopFileSystems',
  's3a://go01-demo/warehouse/tablespace/external/hive'),
 ('spark.sql.extensions',
  'com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension,org.apache.iceberg.spark.extensions.Icebe

### Iceberg Architecture

![alt text](../img/iceberg-metadata.png)

#### Iceberg Catalog

Iceberg comes with catalogs that enable SQL commands to manage tables and load them by name. Catalogs are configured using properties under spark.sql.catalog.(catalog_name).

In [4]:
# Show catalog and database
spark.sql("SHOW CURRENT NAMESPACE").show()

+-------------+---------+
|      catalog|namespace|
+-------------+---------+
|spark_catalog|  default|
+-------------+---------+



In [5]:
# Create a new database
#spark.sql("DROP DATABASE IF EXISTS spark_catalog.lakehouse")
spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.lakehouse")
spark.sql("USE spark_catalog.lakehouse")

DataFrame[]

In [6]:
spark.sql("USE spark_catalog.lakehouse")

DataFrame[]

In [7]:
# Show catalog and database
spark.sql("SHOW CURRENT NAMESPACE").show()

+-------------+---------+
|      catalog|namespace|
+-------------+---------+
|spark_catalog|lakehouse|
+-------------+---------+



#### Create an Iceberg Table with Spark SQL

In [8]:
spark.sql("DROP TABLE IF EXISTS lakehouse.coffees_table PURGE")

23/09/04 22:19:56 WARN HiveMetaStoreClient: Failed to connect to the MetaStore Server...
23/09/04 22:20:14 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
                                                                                

DataFrame[]

# TEST 1 with COW

In [9]:
spark.sql("CREATE TABLE IF NOT EXISTS coffees_table (coffee_id BIGINT, coffee_size STRING, coffee_sale_ts TIMESTAMP)\
          USING ICEBERG\
          PARTITIONED BY (months(coffee_sale_ts))\
          TBLPROPERTIES ('write.delete.mode'='copy-on-write',\
                          'write.update.mode'='copy-on-write',\
                          'write.merge.mode'='copy-on-write',\
                          'format-version' = '2')")

DataFrame[]

#### Verify that a Metadata JSON file has been created under the Metadata directory

In [10]:
from CatalogUtil import CatalogUtil

In [11]:
catalogUtil = CatalogUtil("lakehouse", "coffees_table", "go01-demo", "s3")

In [12]:
print("These are now the files in the Iceberg Metadata Layer:")
catalogUtil.count_metadata_layer_files()

These are now the files in the Iceberg Metadata Layer:
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00000-30b3f58c-b3d7-4cc5-8f03-2d3a6ada2c11.metadata.json
Total number of files in the Metadata Layer: 1


1

In [13]:
print("These are now the metadata files in the Iceberg Metadata Layer:")
catalogUtil.count_metadata_files()

These are now the metadata files in the Iceberg Metadata Layer:
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00000-30b3f58c-b3d7-4cc5-8f03-2d3a6ada2c11.metadata.json
Total Number of Metadata Files: 1


1

In [14]:
print("These are now the manifest lists in the Iceberg Metadata Layer:")
catalogUtil.count_manifest_lists()

These are now the manifest lists in the Iceberg Metadata Layer:
Total Number of Manifest Lists: 0


0

In [15]:
print("These are now the manifest files in the Iceberg Metadata Layer:")
catalogUtil.count_manifest_files()

These are now the manifest files in the Iceberg Metadata Layer:
Total Number of Manifest Files: 0


0

In [16]:
metadata_file_path = catalogUtil.get_latest_metadata_file()

Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00000-30b3f58c-b3d7-4cc5-8f03-2d3a6ada2c11.metadata.json


In [17]:
catalogUtil.print_metadata_file(spark, metadata_file_path)



<IPython.core.display.JSON object>

![alt text](../img/s3_metadata.png)

#### Notice that no snapshots or other files have been created as data has not yet been inserted.

In [18]:
spark.sql("SELECT * FROM lakehouse.coffees_table.history").show()

+---------------+-----------+---------+-------------------+
|made_current_at|snapshot_id|parent_id|is_current_ancestor|
+---------------+-----------+---------+-------------------+
+---------------+-----------+---------+-------------------+



In [19]:
spark.sql("SELECT * FROM lakehouse.coffees_table.snapshots;").show()

+------------+-----------+---------+---------+-------------+-------+
|committed_at|snapshot_id|parent_id|operation|manifest_list|summary|
+------------+-----------+---------+---------+-------------+-------+
+------------+-----------+---------+---------+-------------+-------+



In [20]:
spark.sql("SELECT * FROM lakehouse.coffees_table.files;").show()

+-------+---------+-----------+-------+---------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+
|content|file_path|file_format|spec_id|partition|record_count|file_size_in_bytes|column_sizes|value_counts|null_value_counts|nan_value_counts|lower_bounds|upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id|
+-------+---------+-----------+-------+---------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+
+-------+---------+-----------+-------+---------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+



In [21]:
spark.sql("SELECT * FROM lakehouse.coffees_table.manifests;").show()

+-------+----+------+-----------------+-----------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+
|content|path|length|partition_spec_id|added_snapshot_id|added_data_files_count|existing_data_files_count|deleted_data_files_count|added_delete_files_count|existing_delete_files_count|deleted_delete_files_count|partition_summaries|
+-------+----+------+-----------------+-----------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+
+-------+----+------+-----------------+-----------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+



In [22]:
spark.sql("SELECT * FROM lakehouse.coffees_table.all_data_files;").show()

+-------+---------+-----------+-------+---------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+
|content|file_path|file_format|spec_id|partition|record_count|file_size_in_bytes|column_sizes|value_counts|null_value_counts|nan_value_counts|lower_bounds|upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id|
+-------+---------+-----------+-------+---------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+
+-------+---------+-----------+-------+---------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+



In [23]:
spark.sql("SELECT * FROM lakehouse.coffees_table.all_manifests;").show()

+-------+----+------+-----------------+-----------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+---------------------+
|content|path|length|partition_spec_id|added_snapshot_id|added_data_files_count|existing_data_files_count|deleted_data_files_count|added_delete_files_count|existing_delete_files_count|deleted_delete_files_count|partition_summaries|reference_snapshot_id|
+-------+----+------+-----------------+-----------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+---------------------+
+-------+----+------+-----------------+-----------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+-------

### Table Insert

In [24]:
from pyspark.sql.functions import date_format

In [25]:
#Coffee_id = 1, Coffee_size = venti, coffee_sale_ts = 2023-09-01

spark.sql("INSERT INTO lakehouse.coffees_table VALUES (1, 'venti', cast(date_format('2023-09-01 10:00:00', 'yyyy-MM-dd HH:mm:ss') as timestamp))")

                                                                                

DataFrame[]

#### Data has been added to the data folder

In [26]:
QUERY = "select h.made_current_at,\
            s.operation,\
            h.snapshot_id,\
            h.is_current_ancestor,\
            s.summary['spark.app.id']\
        from lakehouse.coffees_table.history h\
        join lakehouse.coffees_table.snapshots s\
            on h.snapshot_id = s.snapshot_id\
            order by made_current_at;"

In [27]:
spark.sql(QUERY).toPandas()

23/09/04 22:20:42 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.


Unnamed: 0,made_current_at,operation,snapshot_id,is_current_ancestor,summary[spark.app.id]
0,2023-09-04 22:20:37.323,append,7462507546055427409,True,spark-application-1693865986682


#### Notice there are now two json files and two avro files. 

The first json file is the metadata file created when the table was created. This is the metata file prefixed by 00000. The second json file is the new metadata file reflecting the insert of one row. This is the metadata file prefixed by 00001.

In [28]:
print("These are now the files in the Iceberg Metadata Layer:")
catalogUtil.count_metadata_layer_files()

These are now the files in the Iceberg Metadata Layer:
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00000-30b3f58c-b3d7-4cc5-8f03-2d3a6ada2c11.metadata.json
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00001-7d5f111e-852b-4cae-bfa3-a574cdb56a14.metadata.json
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/ee02a218-f8f4-449c-8781-ce82b0450e0c-m0.avro
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/snap-7462507546055427409-1-ee02a218-f8f4-449c-8781-ce82b0450e0c.avro
Total number of files in the Metadata Layer: 4


4

In [29]:
print("These are now the metadata files in the Iceberg Metadata Layer:")
catalogUtil.count_metadata_files()

These are now the metadata files in the Iceberg Metadata Layer:
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00000-30b3f58c-b3d7-4cc5-8f03-2d3a6ada2c11.metadata.json
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00001-7d5f111e-852b-4cae-bfa3-a574cdb56a14.metadata.json
Total Number of Metadata Files: 2


2

In [30]:
print("These are now the manifest lists in the Iceberg Metadata Layer:")
catalogUtil.count_manifest_lists()

These are now the manifest lists in the Iceberg Metadata Layer:
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/snap-7462507546055427409-1-ee02a218-f8f4-449c-8781-ce82b0450e0c.avro
Total Number of Manifest Lists: 1


1

In [31]:
print("These are now the manifest files in the Iceberg Metadata Layer:")
catalogUtil.count_manifest_files()

These are now the manifest files in the Iceberg Metadata Layer:
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/ee02a218-f8f4-449c-8781-ce82b0450e0c-m0.avro
Total Number of Manifest Files: 1


1

In [32]:
metadata_file_path_list = catalogUtil.get_all_metadata_files()

Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00000-30b3f58c-b3d7-4cc5-8f03-2d3a6ada2c11.metadata.json
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00001-7d5f111e-852b-4cae-bfa3-a574cdb56a14.metadata.json


In [33]:
catalogUtil.print_metadata_file(spark, metadata_file_path_list[0])



<IPython.core.display.JSON object>

In [34]:
catalogUtil.print_metadata_file(spark, metadata_file_path_list[1])

                                                                                

<IPython.core.display.JSON object>

The Avro file with the "snap" prefix is the manifest list.

In [35]:
manifest_lists_list = catalogUtil.get_all_manifest_lists()

Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/snap-7462507546055427409-1-ee02a218-f8f4-449c-8781-ce82b0450e0c.avro


In [36]:
catalogUtil.print_manifest_file(spark, manifest_lists_list[0]) 

                                                                                

<IPython.core.display.JSON object>

The Avro file without the "snap" prefix is the manifest file.

In [37]:
manifest_files_list = catalogUtil.get_all_manifest_files()

Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/ee02a218-f8f4-449c-8781-ce82b0450e0c-m0.avro


In [38]:
catalogUtil.print_manifest_file(spark, manifest_files_list[0]) 

                                                                                

<IPython.core.display.JSON object>

In [39]:
data_files_list = catalogUtil.get_all_data_layer_files()
# could also query the all_data_files metadata table:
# spark.sql("SELECT * FROM lakehouse.coffees_table.all_data_files;").toPandas()

Data File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/data/coffee_sale_ts_month=2023-09/00072-211-d8611c4f-62fa-464e-b998-1dccb751e0f5-00001.parquet


In [40]:
catalogUtil.print_data_file(spark, data_files_list[0])

[Stage 22:>                                                         (0 + 1) / 1]

+---------+-----------+-------------------+
|coffee_id|coffee_size|     coffee_sale_ts|
+---------+-----------+-------------------+
|        1|      venti|2023-09-01 10:00:00|
+---------+-----------+-------------------+



                                                                                

### Table Merge Into

Create a staging table

In [41]:
spark.sql("DROP TABLE IF EXISTS lakehouse.coffee_staging PURGE")

                                                                                

DataFrame[]

In [42]:
spark.sql("CREATE TABLE IF NOT EXISTS lakehouse.coffee_staging\
            (coffee_id BIGINT, coffee_size STRING, coffee_sale_ts TIMESTAMP)\
            USING iceberg\
            PARTITIONED BY (months(coffee_sale_ts))")

DataFrame[]

In [43]:
spark.sql("INSERT INTO lakehouse.coffee_staging\
            VALUES (1, 'grande', cast(date_format('2023-07-01 11:00:00', 'yyyy-MM-dd HH:mm:ss') as timestamp)),\
            (2, 'grande', cast(date_format('2023-07-01 11:10:00', 'yyyy-MM-dd HH:mm:ss') as timestamp)),\
            (3, 'tall', cast(date_format('2023-04-01 12:01:00', 'yyyy-MM-dd HH:mm:ss') as timestamp))")

#Row: Coffee_id = 1, coffee_size = venti, coffee_sale_ts = 2023-07-01
#Row: Coffee_id = 2, coffee_size = grande, coffee_sale_ts = 2023-07-01
#Row: Coffee_id = 3, coffee_size = tall, coffee_sale_ts = 2023-04-01

                                                                                

DataFrame[]

Merge Into Customers Table

In [44]:
spark.sql("MERGE INTO lakehouse.coffees_table c\
            USING (SELECT * FROM lakehouse.coffee_staging) s\
            ON c.coffee_id = s.coffee_id \
            WHEN MATCHED THEN UPDATE SET c.coffee_size = s.coffee_size\
            WHEN NOT MATCHED THEN INSERT *")

23/09/04 22:21:22 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
                                                                                

DataFrame[]

In [45]:
spark.sql("SELECT * FROM lakehouse.coffees_table.snapshots;").toPandas()

Unnamed: 0,committed_at,snapshot_id,parent_id,operation,manifest_list,summary
0,2023-09-04 22:20:37.323,7462507546055427409,,append,s3a://go01-demo/warehouse/tablespace/external/...,{'spark.app.id': 'spark-application-1693865986...
1,2023-09-04 22:21:30.476,7257983078605640789,7.462508e+18,overwrite,s3a://go01-demo/warehouse/tablespace/external/...,"{'added-data-files': '3', 'total-equality-dele..."


In [46]:
spark.sql("SELECT * FROM lakehouse.coffees_table.manifests;").toPandas()

                                                                                

Unnamed: 0,content,path,length,partition_spec_id,added_snapshot_id,added_data_files_count,existing_data_files_count,deleted_data_files_count,added_delete_files_count,existing_delete_files_count,deleted_delete_files_count,partition_summaries
0,0,s3a://go01-demo/warehouse/tablespace/external/...,7210,0,7257983078605640789,3,0,0,0,0,0,"[(False, False, 2023-04, 2023-09)]"
1,0,s3a://go01-demo/warehouse/tablespace/external/...,7071,0,7257983078605640789,0,0,1,0,0,0,"[(False, False, 2023-09, 2023-09)]"


In [47]:
spark.sql("SELECT * FROM lakehouse.coffees_table.all_data_files;").toPandas()

                                                                                

Unnamed: 0,content,file_path,file_format,spec_id,partition,record_count,file_size_in_bytes,column_sizes,value_counts,null_value_counts,nan_value_counts,lower_bounds,upper_bounds,key_metadata,split_offsets,equality_ids,sort_order_id
0,0,s3a://go01-demo/warehouse/tablespace/external/...,PARQUET,0,"(644,)",1,976,"{1: 33, 2: 34, 3: 39}","{1: 1, 2: 1, 3: 1}","{1: 0, 2: 0, 3: 0}",{},"{1: [1, 0, 0, 0, 0, 0, 0, 0], 2: [118, 101, 11...","{1: [1, 0, 0, 0, 0, 0, 0, 0], 2: [118, 101, 11...",,[4],,0
1,0,s3a://go01-demo/warehouse/tablespace/external/...,PARQUET,0,"(642,)",1,999,"{1: 39, 2: 41, 3: 39}","{1: 1, 2: 1, 3: 1}","{1: 0, 2: 0, 3: 0}",{},"{1: [2, 0, 0, 0, 0, 0, 0, 0], 2: [103, 114, 97...","{1: [2, 0, 0, 0, 0, 0, 0, 0], 2: [103, 114, 97...",,[4],,0
2,0,s3a://go01-demo/warehouse/tablespace/external/...,PARQUET,0,"(644,)",1,999,"{1: 39, 2: 41, 3: 39}","{1: 1, 2: 1, 3: 1}","{1: 0, 2: 0, 3: 0}",{},"{1: [1, 0, 0, 0, 0, 0, 0, 0], 2: [103, 114, 97...","{1: [1, 0, 0, 0, 0, 0, 0, 0], 2: [103, 114, 97...",,[4],,0
3,0,s3a://go01-demo/warehouse/tablespace/external/...,PARQUET,0,"(639,)",1,985,"{1: 39, 2: 39, 3: 39}","{1: 1, 2: 1, 3: 1}","{1: 0, 2: 0, 3: 0}",{},"{1: [3, 0, 0, 0, 0, 0, 0, 0], 2: [116, 97, 108...","{1: [3, 0, 0, 0, 0, 0, 0, 0], 2: [116, 97, 108...",,[4],,0


RECAP: The target table coffee was empty. We ran an insert and that created one new metadata file, one new manifest file, and one new data file.
Then we ran a Merge Into from the staging table. Of the three rows that we compared against, one was a match and two were not.
The one that was a match was rewritten to a brand new data file in the same partition (coffee_sale_ts_month=2023-09) which now has two data files.
The remaining two rows that were not a match were written to two new data files in the two respective partitions. If they had fallen under the same partition they may have been written into the same data file (next example).

In [48]:
print("These are now the files in the Iceberg Metadata Layer:")
catalogUtil.count_metadata_layer_files()

These are now the files in the Iceberg Metadata Layer:
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00000-30b3f58c-b3d7-4cc5-8f03-2d3a6ada2c11.metadata.json
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00001-7d5f111e-852b-4cae-bfa3-a574cdb56a14.metadata.json
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00002-3512e62e-a490-4057-802c-09060baa6911.metadata.json
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/cf556cc3-2af0-4bd8-9228-d4c11755e0b7-m0.avro
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/cf556cc3-2af0-4bd8-9228-d4c11755e0b7-m1.avro
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/ee02a218-f8f4-449c-8781-ce82b0450e0c-m0.avro
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/sn

8

In [49]:
print("These are now the metadata files in the Iceberg Metadata Layer:")
catalogUtil.count_metadata_files()

These are now the metadata files in the Iceberg Metadata Layer:
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00000-30b3f58c-b3d7-4cc5-8f03-2d3a6ada2c11.metadata.json
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00001-7d5f111e-852b-4cae-bfa3-a574cdb56a14.metadata.json
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00002-3512e62e-a490-4057-802c-09060baa6911.metadata.json
Total Number of Metadata Files: 3


3

In [50]:
print("These are now the manifest lists in the Iceberg Metadata Layer:")
catalogUtil.count_manifest_lists()

These are now the manifest lists in the Iceberg Metadata Layer:
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/snap-7257983078605640789-1-cf556cc3-2af0-4bd8-9228-d4c11755e0b7.avro
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/snap-7462507546055427409-1-ee02a218-f8f4-449c-8781-ce82b0450e0c.avro
Total Number of Manifest Lists: 2


2

In [51]:
print("These are now the manifest files in the Iceberg Metadata Layer:")
catalogUtil.count_manifest_files()

These are now the manifest files in the Iceberg Metadata Layer:
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/cf556cc3-2af0-4bd8-9228-d4c11755e0b7-m0.avro
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/cf556cc3-2af0-4bd8-9228-d4c11755e0b7-m1.avro
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/ee02a218-f8f4-449c-8781-ce82b0450e0c-m0.avro
Total Number of Manifest Files: 3


3

In [52]:
metadata_file_path_list = catalogUtil.get_all_metadata_layer_files()

Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00000-30b3f58c-b3d7-4cc5-8f03-2d3a6ada2c11.metadata.json
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00001-7d5f111e-852b-4cae-bfa3-a574cdb56a14.metadata.json
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00002-3512e62e-a490-4057-802c-09060baa6911.metadata.json
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/cf556cc3-2af0-4bd8-9228-d4c11755e0b7-m0.avro
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/cf556cc3-2af0-4bd8-9228-d4c11755e0b7-m1.avro
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/ee02a218-f8f4-449c-8781-ce82b0450e0c-m0.avro
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/snap-7257983078605640789-1-cf556cc3-2af0-4bd8-9228-d4c117

In [53]:
data_files_list = catalogUtil.get_all_data_layer_files()

Data File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/data/coffee_sale_ts_month=2023-04/00156-1033-a93cedac-db38-4e28-ad16-6e1267c35d7f-00001.parquet
Data File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/data/coffee_sale_ts_month=2023-07/00011-834-6ed85fcc-cf79-4762-8245-494b01333410-00001.parquet
Data File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/data/coffee_sale_ts_month=2023-09/00072-1031-4166a8f5-7efa-4708-9ede-13ce4bf5b75b-00001.parquet
Data File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/data/coffee_sale_ts_month=2023-09/00072-211-d8611c4f-62fa-464e-b998-1dccb751e0f5-00001.parquet


In [54]:
metadata_file_path_list = catalogUtil.get_all_metadata_files()

Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00000-30b3f58c-b3d7-4cc5-8f03-2d3a6ada2c11.metadata.json
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00001-7d5f111e-852b-4cae-bfa3-a574cdb56a14.metadata.json
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/00002-3512e62e-a490-4057-802c-09060baa6911.metadata.json


In [55]:
catalogUtil.print_metadata_file(spark, metadata_file_path_list[2])



<IPython.core.display.JSON object>

In [56]:
manifest_lists_list = catalogUtil.get_all_manifest_lists()

Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/snap-7257983078605640789-1-cf556cc3-2af0-4bd8-9228-d4c11755e0b7.avro
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/snap-7462507546055427409-1-ee02a218-f8f4-449c-8781-ce82b0450e0c.avro


In [57]:
catalogUtil.print_manifest_file(spark, manifest_lists_list[1]) 

                                                                                

<IPython.core.display.JSON object>

In [58]:
manifest_files_list = catalogUtil.get_all_manifest_files()

Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/cf556cc3-2af0-4bd8-9228-d4c11755e0b7-m0.avro
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/cf556cc3-2af0-4bd8-9228-d4c11755e0b7-m1.avro
Metadata File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/metadata/ee02a218-f8f4-449c-8781-ce82b0450e0c-m0.avro


In [59]:
catalogUtil.print_manifest_file(spark, manifest_files_list[1]) 

                                                                                

<IPython.core.display.JSON object>

In [60]:
catalogUtil.print_manifest_file(spark, manifest_files_list[2]) 

<IPython.core.display.JSON object>

In [61]:
data_files_list = catalogUtil.get_all_data_layer_files()

Data File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/data/coffee_sale_ts_month=2023-04/00156-1033-a93cedac-db38-4e28-ad16-6e1267c35d7f-00001.parquet
Data File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/data/coffee_sale_ts_month=2023-07/00011-834-6ed85fcc-cf79-4762-8245-494b01333410-00001.parquet
Data File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/data/coffee_sale_ts_month=2023-09/00072-1031-4166a8f5-7efa-4708-9ede-13ce4bf5b75b-00001.parquet
Data File Path: warehouse/tablespace/external/hive/lakehouse.db/coffees_table/data/coffee_sale_ts_month=2023-09/00072-211-d8611c4f-62fa-464e-b998-1dccb751e0f5-00001.parquet


In [66]:
catalogUtil.print_data_file(spark, data_files_list[0])

[Stage 53:>                                                         (0 + 1) / 1]

+---------+-----------+-------------------+
|coffee_id|coffee_size|     coffee_sale_ts|
+---------+-----------+-------------------+
|        3|       tall|2023-04-01 12:01:00|
+---------+-----------+-------------------+



                                                                                

In [62]:
catalogUtil.print_data_file(spark, data_files_list[1])

[Stage 46:>                                                         (0 + 1) / 1]

+---------+-----------+-------------------+
|coffee_id|coffee_size|     coffee_sale_ts|
+---------+-----------+-------------------+
|        2|     grande|2023-07-01 11:10:00|
+---------+-----------+-------------------+



                                                                                

NB: The row with coffee_id = 1 was a match between the target and staging tables. Because COW was used, a new file was written with the same row and the updated value for the "coffee_size" field.

In [69]:
print("The new data file with the updated row (size grande):")
data_files_list[2]

The new data file with the updated row:


'warehouse/tablespace/external/hive/lakehouse.db/coffees_table/data/coffee_sale_ts_month=2023-09/00072-1031-4166a8f5-7efa-4708-9ede-13ce4bf5b75b-00001.parquet'

In [63]:
catalogUtil.print_data_file(spark, data_files_list[2])

[Stage 48:>                                                         (0 + 1) / 1]

+---------+-----------+-------------------+
|coffee_id|coffee_size|     coffee_sale_ts|
+---------+-----------+-------------------+
|        1|     grande|2023-09-01 10:00:00|
+---------+-----------+-------------------+



                                                                                

In [70]:
print("The old data file with the original row (size venti):")
data_files_list[3]

The old data file with the original row:


'warehouse/tablespace/external/hive/lakehouse.db/coffees_table/data/coffee_sale_ts_month=2023-09/00072-211-d8611c4f-62fa-464e-b998-1dccb751e0f5-00001.parquet'

In [64]:
catalogUtil.print_data_file(spark, data_files_list[3])

[Stage 50:>                                                         (0 + 1) / 1]

+---------+-----------+-------------------+
|coffee_id|coffee_size|     coffee_sale_ts|
+---------+-----------+-------------------+
|        1|      venti|2023-09-01 10:00:00|
+---------+-----------+-------------------+



                                                                                

In [65]:
spark.sql("SELECT * FROM lakehouse.coffees_table").show()

[Stage 51:>                                                         (0 + 1) / 1]

+---------+-----------+-------------------+
|coffee_id|coffee_size|     coffee_sale_ts|
+---------+-----------+-------------------+
|        2|     grande|2023-07-01 11:10:00|
|        1|     grande|2023-09-01 10:00:00|
|        3|       tall|2023-04-01 12:01:00|
+---------+-----------+-------------------+



                                                                                

#### There is a new metadata file (json) prefixed by 0002.

#### There is a new manifest list file (avro) prefixed by "snap"

#### There is a new manifest file (avro)

### Time Travel 

In [None]:
snapshots_df = spark.sql("SELECT * FROM lakehouse.customer_table.snapshots;")

In [None]:
first_snapshot = snapshots_df.select("snapshot_id").head(1)[0][0]

#### Validate that the output dataframe only includes one row per the original insert

In [None]:
spark.read\
    .option("snapshot-id", first_snapshot)\
    .format("iceberg")\
    .load("lakehouse.customer_table").toPandas()

In [None]:
avro_tempdf = spark.read.format("avro").load("s3a://go01-demo/" + metadata_file_list[6]).toPandas()

In [None]:
avro_tempdf.columns

In [None]:
avro_tempdf['partitions']

In [None]:
avro_tempdf['added_rows_count']

In [None]:
avro_tempdf['existing_rows_count']

In [None]:
avro_tempdf['added_data_files_count']

In [None]:
print("Showing " + metadata_file_list[2])
json_tempdf = spark.read.option("multiline","true").json("s3a://go01-demo/" + metadata_file_list[2]).toPandas()

In [None]:
json_tempdf.columns

In [None]:
json_tempdf['current-schema-id']

In [None]:
list(json_tempdf['snapshots'])

In [None]:
json_tempdf['partition-spec']

In [None]:
list(json_tempdf['partition-specs'])

In [None]:
spark.sql("SELECT * FROM lakehouse.coffees_table_2.all_data_files;").show()

### Partition Evolution

Spark partitioning is a way to split the data into multiple partitions so that you can execute transformations on multiple partitions in parallel which allows completing the job faster. You can also write partitioned data into a file system (multiple sub-directories) for faster reads by downstream systems.

Spark has several partitioning methods to achieve parallelism, based on your need, you should choose which one to use.

Creating New Data to Test Partition Evolution

In [None]:
from pyspark.sql.types import LongType, IntegerType, StringType

import dbldatagen as dg

shuffle_partitions_requested = 20
device_population = 100000
data_rows = 20 * 1000000
#partitions_requested = 20

spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)

country_codes = [
    "CN", "US", "FR", "CA", "IN", "JM", "IE", "PK", "GB", "IL", "AU", 
    "SG", "ES", "GE", "MX", "ET", "SA", "LB", "NL", "IT"
]
#country_weights = [
#    1300, 365, 67, 38, 1300, 3, 7, 212, 67, 9, 25, 6, 47, 83, 
#    126, 109, 58, 8, 17,
#]

manufacturers = [
    "Delta corp", "Xyzzy Inc.", "Lakehouse Ltd", "Acme Corp", "Embanks Devices",
]

lines = ["delta", "xyzzy", "lakehouse", "gadget", "droid"]

testDataSpec = (
    dg.DataGenerator(spark, name="device_data_set", rows=data_rows) 
                     #,partitions=partitions_requested)
    .withIdOutput()
    # we'll use hash of the base field to generate the ids to
    # avoid a simple incrementing sequence
    .withColumn("internal_device_id", "long", minValue=0x1000000000000, 
                uniqueValues=device_population, omit=True, baseColumnType="hash",
    )
    # note for format strings, we must use "%lx" not "%x" as the
    # underlying value is a long
    .withColumn(
        "device_id", "string", format="0x%013x", baseColumn="internal_device_id"
    )
    # the device / user attributes will be the same for the same device id
    # so lets use the internal device id as the base column for these attribute
    .withColumn("country", "string", values=country_codes, #weights=country_weights, 
                baseColumn="internal_device_id")
    .withColumn("manufacturer", "string", values=manufacturers, 
                baseColumn="internal_device_id", )
    # use omit = True if you don't want a column to appear in the final output
    # but just want to use it as part of generation of another column
    .withColumn("line", "string", values=lines, baseColumn="manufacturer", 
                baseColumnType="hash", omit=True )
    .withColumn("model_ser", "integer", minValue=1, maxValue=11, baseColumn="device_id", 
                baseColumnType="hash", omit=True, )
    .withColumn("model_line", "string", expr="concat(line, '#', model_ser)", 
                baseColumn=["line", "model_ser"] )
    .withColumn("event_type", "string", 
                values=["activation", "deactivation", "plan change", "telecoms activity", 
                        "internet activity", "device error", ],
                random=True)
    .withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00", 
                end="2020-12-31 23:59:00", 
                interval="1 minute", random=True )
)

dfTestData = testDataSpec.build()

display(dfTestData)

In [None]:
dfTestData.head()

In [None]:
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")

In [None]:
spark.sql("DROP TABLE IF EXISTS spark_catalog.lakehouse.partition_evol_tbl PURGE")

In [None]:
#dfTestData.groupBy("country").count().show()

In [None]:
#dfTestData.rdd.getNumPartitions()

Iceberg requires the data to be sorted according to the partition spec per task (Spark partition) in prior to write against partitioned table. This applies both Writing with SQL and Writing with DataFrames.

In [None]:
dfTestData.sortWithinPartitions("country").writeTo("spark_catalog.lakehouse.p_evol_tbl").partitionedBy("country").using("iceberg").create()#.append()#replace()#overwritePartitions()#create()

In [None]:
#spark.sql("SELECT * FROM spark_catalog.lakehouse.part_evol_tbl.PARTITIONS").show()

In [None]:
#spark.sql("SELECT * FROM spark_catalog.lakehouse.part_evol_tbl.files").show()

In [None]:
#spark.sql("SELECT * FROM spark_catalog.lakehouse.part_evol_tbl.manifests").show()

In [None]:
#spark.sql("SELECT * FROM spark_catalog.lakehouse.part_evol_tbl.all_manifests").show()

In [None]:
#spark.sql("SELECT * FROM spark_catalog.lakehouse.part_evol_tbl.all_data_files").show()

In [None]:
#spark.sql("SELECT * FROM spark_catalog.lakehouse.part_evol_tbl.snapshots").show()

Adding a partition field is a metadata operation and does not change any of the existing table data. New data will be written with the new partitioning, but existing data will remain in the old partition layout. Old data files will have null values for the new partition fields in metadata tables.

In [None]:
print("TABLE PARTITIONS BEFORE ALTER PARTITION STATEMENT: ")
spark.sql("SELECT * FROM spark_catalog.lakehouse.p_evol_tbl.PARTITIONS").show()

In [None]:
print("ADD PARTITION BY EVENT TIMESTAMP MONTHS: ")
print("ALTER TABLE spark_catalog.lakehouse.p_evol_tbl ADD PARTITION FIELD months(event_ts)")
spark.sql("ALTER TABLE spark_catalog.lakehouse.p_evol_tbl ADD PARTITION FIELD months(event_ts)")
#spark.sql("ALTER TABLE spark_catalog.lakehouse.part_evol_tbl REPLACE PARTITION FIELD hours(dob) WITH state")
#spark.sql("ALTER TABLE prod.db.sample ADD PARTITION FIELD month")

#ALTER TABLE spark_catalog.lakehouse.part_evol_tbl ADD PARTITION FIELD days(event_ts)

In [None]:
print("TABLE PARTITIONS AFTER ALTER PARTITION STATEMENT: ")
spark.sql("SELECT * FROM spark_catalog.lakehouse.p_evol_tbl.PARTITIONS").show()

In [None]:
appendDf = dfTestData.sample(fraction=0.3, seed=3)

In [None]:
appendDf.dtypes

In [None]:
appendDf.rdd.getNumPartitions()

In [None]:
appendDf.show()

In [None]:
appendDf.sortWithinPartitions("country").show()

In [None]:
#appendDf.sortWithinPartitions("country", "month(event_ts)").show()

In [None]:
appendDf.sortWithinPartitions("country").writeTo("spark_catalog.lakehouse.p_evol_tbl").using("iceberg").append() #.append()#replace()#overwritePartitions()#create()

In [None]:
print("TABLE PARTITIONS AFTER APPEND: ")
spark.sql("SELECT * FROM spark_catalog.lakehouse.p_evol_tbl.PARTITIONS").show(100)

Dropping a partition field is a metadata operation and does not change any of the existing table data. New data will be written with the new partitioning, but existing data will remain in the old partition layout.

In [None]:
spark.sql("ALTER TABLE spark_catalog.lakehouse.part_evol_tbl DROP PARTITION FIELD bucket(16, device_id)")

In [None]:
print("TABLE PARTITIONS AFTER ALTER PARTITION STATEMENT: ")
spark.sql("SELECT * FROM spark_catalog.lakehouse.part_evol_tbl.PARTITIONS").show()

##### Only json files have been added (one per each time you repartitioned) but Avro files have stayed the same

In [None]:
s3 = boto3.resource('s3')
my_bucket = s3.Bucket("go01-demo")

metadata_file_list = []

print("Current Metadata Files: \n")

for object_summary in my_bucket.objects.filter(Prefix=metadata_path+"/metadata"):
    #print(object_summary.key +"\n")
    metadata_file_list.append(object_summary.key)
    
metadata_file_list

In [None]:
spark.sql("CREATE TABLE IF NOT EXISTS customer_table (id BIGINT, state STRING, country STRING, dob TIMESTAMP) USING iceberg PARTITIONED BY ( hours(dob))")

In [None]:
spark.sql("SELECT HOUR(dob) FROM spark_catalog.lakehouse.customer_table").show()

In [None]:
spark.sql("SELECT DAY(dob) FROM spark_catalog.lakehouse.customer_table").show()

### Dropping Tables

In [None]:
spark.sql("DROP TABLE IF EXISTS lakehouse.staging")

Validate that the metadata folder is now empty but the data folder still retains parquet files.

![alt text](../img/s3_droptable_1.png)

![alt text](../img/s3_droptable_2.png)

![alt text](../img/s3_droptable_3.png)

In [None]:
spark.sql("ALTER TABLE lakehouse.customers_table\
            SET TBLPROPERTIES ('format-version' = '2')")

In [None]:
s3 = boto3.resource('s3')
my_bucket = s3.Bucket("go01-demo")

metadata_file_list = []

for object_summary in my_bucket.objects.filter(Prefix=metadata_path):
    print(object_summary.key +"\n")
    metadata_file_list.append(object_summary.key)

In [None]:
print("Showing " + metadata_file_list[3])
spark.read.option("multiline","true").json("s3a://go01-demo/" + metadata_file_list[3]).toPandas()