## Introduction to Iceberg Architecture

#### Launching a Spark Session with Iceberg

In [2]:
import cml.data_v1 as cmldata

CONNECTION_NAME = "go01-aw-dl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

In [3]:
spark.sparkContext.getConf().getAll()

24/03/16 00:58:56 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
24/03/16 00:58:56 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.


[('spark.kubernetes.driver.pod.name', 'ikgvjqk3qptnit2c'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.ui.proxyRedirectUri',
  'https://spark-ikgvjqk3qptnit2c.ml-d7f9c760-9de.go01-dem.ylcu-atmi.cloudera.site'),
 ('spark.network.crypto.enabled', 'true'),
 ('spark.sql.hive.hwc.execution.mode', 'spark'),
 ('spark.driver.port', '34509'),
 ('spark.jars',
  '/opt/spark/optional-lib/hive-warehouse-connector-assembly.jar,/opt/spark/optional-lib/iceberg-hive-runtime.jar,/opt/spark/optional-lib/iceberg-spark-runtime.jar'),
 ('spark.driver.host', '100.100.8.239'),
 ('spark.kerberos.renewal.credentials', 'ccache'),
 ('spark.sql.catalog.spark_catalog',
  'org.apache.iceberg.spark.SparkSessionCatalog'),
 ('spark.driver.bindAddress', '100.100.8.239'),
 ('spark.eventLog.dir', 'file:///sparkeventlogs'),
 ('spark.dynamicAllocation.maxExecutors', '249'),
 ('spark.hadoop.yarn.resourcemanager.principal', 'pauldefusco'),
 ('spark.kubernetes.driver.annotation.cluster-autoscaler.kubernetes.io/safe-to-evict'

### Iceberg Architecture

![alt text](img/iceberg-metadata.png)

#### Iceberg Catalog

Iceberg comes with catalogs that enable SQL commands to manage tables and load them by name. Catalogs are configured using properties under spark.sql.catalog.(catalog_name).

In [4]:
# Show catalog and database
spark.sql("SHOW CURRENT NAMESPACE").show()

24/03/16 01:00:38 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.


+-------------+---------+
|      catalog|namespace|
+-------------+---------+
|spark_catalog|  default|
+-------------+---------+



In [6]:
# Create a new database
#spark.sql("DROP DATABASE IF EXISTS spark_catalog.lakehouse")
import os 

username = os.environ["PROJECT_OWNER"]

spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.{}_lakehouse".format(username))
spark.sql("USE spark_catalog.{}_lakehouse".format(username))

DataFrame[]

In [7]:
spark.sql("USE spark_catalog.{}_lakehouse".format(username))

DataFrame[]

In [8]:
# Show catalog and database
spark.sql("SHOW CURRENT NAMESPACE").show()

+-------------+--------------------+
|      catalog|           namespace|
+-------------+--------------------+
|spark_catalog|pauldefusco_lakeh...|
+-------------+--------------------+



#### Create an Iceberg Table with Spark SQL

In [9]:
spark.sql("DROP TABLE IF EXISTS {}_lakehouse.coffees_table PURGE".format(username))

24/03/16 01:02:28 WARN HiveMetaStoreClient: Failed to connect to the MetaStore Server...


DataFrame[]

#### Iceberg Create Table 

In [10]:
spark.sql("CREATE TABLE IF NOT EXISTS {}_lakehouse.coffees_table (coffee_id BIGINT, coffee_size STRING, coffee_sale_ts TIMESTAMP)\
          USING ICEBERG\
          PARTITIONED BY (months(coffee_sale_ts))\
          TBLPROPERTIES ('write.delete.mode'='copy-on-write',\
                          'write.update.mode'='copy-on-write',\
                          'write.merge.mode'='copy-on-write',\
                          'format-version' = '2')".format(username))

DataFrame[]

#### Verify that a Metadata JSON file has been created under the Metadata directory

#### Notice that no snapshots or other files have been created as data has not yet been inserted.

In [12]:
spark.sql("SELECT * FROM {}_lakehouse.coffees_table.history".format(username)).show()

+---------------+-----------+---------+-------------------+
|made_current_at|snapshot_id|parent_id|is_current_ancestor|
+---------------+-----------+---------+-------------------+
+---------------+-----------+---------+-------------------+



In [13]:
spark.sql("SELECT * FROM {}_lakehouse.coffees_table.snapshots".format(username)).show()

+------------+-----------+---------+---------+-------------+-------+
|committed_at|snapshot_id|parent_id|operation|manifest_list|summary|
+------------+-----------+---------+---------+-------------+-------+
+------------+-----------+---------+---------+-------------+-------+



In [14]:
spark.sql("SELECT * FROM {}_lakehouse.coffees_table.files".format(username)).show()

+-------+---------+-----------+-------+---------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+
|content|file_path|file_format|spec_id|partition|record_count|file_size_in_bytes|column_sizes|value_counts|null_value_counts|nan_value_counts|lower_bounds|upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id|
+-------+---------+-----------+-------+---------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+
+-------+---------+-----------+-------+---------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+



In [15]:
spark.sql("SELECT * FROM {}_lakehouse.coffees_table.manifests".format(username)).show()

+-------+----+------+-----------------+-----------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+
|content|path|length|partition_spec_id|added_snapshot_id|added_data_files_count|existing_data_files_count|deleted_data_files_count|added_delete_files_count|existing_delete_files_count|deleted_delete_files_count|partition_summaries|
+-------+----+------+-----------------+-----------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+
+-------+----+------+-----------------+-----------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+



In [16]:
spark.sql("SELECT * FROM {}_lakehouse.coffees_table.all_data_files".format(username)).show()

+-------+---------+-----------+-------+---------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+
|content|file_path|file_format|spec_id|partition|record_count|file_size_in_bytes|column_sizes|value_counts|null_value_counts|nan_value_counts|lower_bounds|upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id|
+-------+---------+-----------+-------+---------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+
+-------+---------+-----------+-------+---------+------------+------------------+------------+------------+-----------------+----------------+------------+------------+------------+-------------+------------+-------------+



In [17]:
spark.sql("SELECT * FROM {}_lakehouse.coffees_table.all_manifests".format(username)).show()

+-------+----+------+-----------------+-----------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+---------------------+
|content|path|length|partition_spec_id|added_snapshot_id|added_data_files_count|existing_data_files_count|deleted_data_files_count|added_delete_files_count|existing_delete_files_count|deleted_delete_files_count|partition_summaries|reference_snapshot_id|
+-------+----+------+-----------------+-----------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+---------------------+
+-------+----+------+-----------------+-----------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+-------

### Table Insert

In [18]:
from pyspark.sql.functions import date_format

In [19]:
#Coffee_id = 1, Coffee_size = venti, coffee_sale_ts = 2023-09-01

spark.sql("INSERT INTO {}_lakehouse.coffees_table VALUES (1, 'venti', cast(date_format('2023-09-01 10:00:00', 'yyyy-MM-dd HH:mm:ss') as timestamp))".format(username))

                                                                                

DataFrame[]

#### Data has been added to the data folder

###### Notice that the history and snapshots tables have now been populated with a record reflecting the first INSERT operation

In [20]:
QUERY = "select h.made_current_at,\
            s.operation,\
            h.snapshot_id,\
            h.is_current_ancestor,\
            s.summary['spark.app.id']\
        from {0}_lakehouse.coffees_table.history h\
        join {0}_lakehouse.coffees_table.snapshots s\
            on h.snapshot_id = s.snapshot_id\
            order by made_current_at;".format(username)

In [22]:
spark.sql(QUERY).show()

24/03/16 01:09:59 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
[Stage 9:>                                                          (0 + 1) / 1]

+--------------------+---------+-------------------+-------------------+---------------------+
|     made_current_at|operation|        snapshot_id|is_current_ancestor|summary[spark.app.id]|
+--------------------+---------+-------------------+-------------------+---------------------+
|2024-03-16 01:09:...|   append|4358765587127049418|               true| spark-application...|
+--------------------+---------+-------------------+-------------------+---------------------+



                                                                                

In [26]:
spark.sql("SELECT * FROM {}_lakehouse.coffees_table.all_manifests".format(username)).show()

+-------+--------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+--------------------+---------------------+
|content|                path|length|partition_spec_id|  added_snapshot_id|added_data_files_count|existing_data_files_count|deleted_data_files_count|added_delete_files_count|existing_delete_files_count|deleted_delete_files_count| partition_summaries|reference_snapshot_id|
+-------+--------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+--------------------+---------------------+
|      0|s3a://go01-demo/w...|  7076|                0|4358765587127049418|                     1|                        0|                       0|                       0|       

### Table Merge Into

Create a staging table

In [27]:
spark.sql("DROP TABLE IF EXISTS {}_lakehouse.coffee_staging PURGE".format(username))

DataFrame[]

In [28]:
spark.sql("CREATE TABLE IF NOT EXISTS {}_lakehouse.coffee_staging\
            (coffee_id BIGINT, coffee_size STRING, coffee_sale_ts TIMESTAMP)\
            USING iceberg\
            PARTITIONED BY (months(coffee_sale_ts))".format(username))

DataFrame[]

In [29]:
spark.sql("INSERT INTO {}_lakehouse.coffee_staging\
            VALUES (1, 'grande', cast(date_format('2023-07-01 11:00:00', 'yyyy-MM-dd HH:mm:ss') as timestamp)),\
            (2, 'grande', cast(date_format('2023-07-01 11:10:00', 'yyyy-MM-dd HH:mm:ss') as timestamp)),\
            (3, 'tall', cast(date_format('2023-04-01 12:01:00', 'yyyy-MM-dd HH:mm:ss') as timestamp))".format(username))

#Row: Coffee_id = 1, coffee_size = venti, coffee_sale_ts = 2023-07-01
#Row: Coffee_id = 2, coffee_size = grande, coffee_sale_ts = 2023-07-01
#Row: Coffee_id = 3, coffee_size = tall, coffee_sale_ts = 2023-04-01

                                                                                

DataFrame[]

Merge Into Customers Table

In [31]:
spark.sql("MERGE INTO {0}_lakehouse.coffees_table c\
            USING (SELECT * FROM {0}_lakehouse.coffee_staging) s\
            ON c.coffee_id = s.coffee_id \
            WHEN MATCHED THEN UPDATE SET c.coffee_size = s.coffee_size\
            WHEN NOT MATCHED THEN INSERT *".format(username))

24/03/16 01:14:25 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
                                                                                

DataFrame[]

In [33]:
spark.sql("SELECT * FROM {}_lakehouse.coffees_table.snapshots;".format(username)).show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2024-03-16 01:09:...|4358765587127049418|               null|   append|s3a://go01-demo/w...|{spark.app.id -> ...|
|2024-03-16 01:14:...|5428666139761163381|4358765587127049418|overwrite|s3a://go01-demo/w...|{spark.app.id -> ...|
|2024-03-16 01:14:...| 779761274742802786|5428666139761163381|overwrite|s3a://go01-demo/w...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [34]:
spark.sql("SELECT * FROM {}_lakehouse.coffees_table.manifests;".format(username)).toPandas()

                                                                                

Unnamed: 0,content,path,length,partition_spec_id,added_snapshot_id,added_data_files_count,existing_data_files_count,deleted_data_files_count,added_delete_files_count,existing_delete_files_count,deleted_delete_files_count,partition_summaries
0,0,s3a://go01-demo/warehouse/tablespace/external/...,7216,0,779761274742802786,3,0,0,0,0,0,"[(False, False, 2023-04, 2023-09)]"
1,0,s3a://go01-demo/warehouse/tablespace/external/...,7214,0,779761274742802786,0,0,3,0,0,0,"[(False, False, 2023-04, 2023-09)]"


In [35]:
spark.sql("SELECT * FROM {}_lakehouse.coffees_table.all_data_files;".format(username)).toPandas()

Unnamed: 0,content,file_path,file_format,spec_id,partition,record_count,file_size_in_bytes,column_sizes,value_counts,null_value_counts,nan_value_counts,lower_bounds,upper_bounds,key_metadata,split_offsets,equality_ids,sort_order_id
0,0,s3a://go01-demo/warehouse/tablespace/external/...,PARQUET,0,"(644,)",1,976,"{1: 33, 2: 34, 3: 39}","{1: 1, 2: 1, 3: 1}","{1: 0, 2: 0, 3: 0}",{},"{1: [1, 0, 0, 0, 0, 0, 0, 0], 2: [118, 101, 11...","{1: [1, 0, 0, 0, 0, 0, 0, 0], 2: [118, 101, 11...",,[4],,0
1,0,s3a://go01-demo/warehouse/tablespace/external/...,PARQUET,0,"(642,)",1,999,"{1: 39, 2: 41, 3: 39}","{1: 1, 2: 1, 3: 1}","{1: 0, 2: 0, 3: 0}",{},"{1: [2, 0, 0, 0, 0, 0, 0, 0], 2: [103, 114, 97...","{1: [2, 0, 0, 0, 0, 0, 0, 0], 2: [103, 114, 97...",,[4],,0
2,0,s3a://go01-demo/warehouse/tablespace/external/...,PARQUET,0,"(644,)",1,999,"{1: 39, 2: 41, 3: 39}","{1: 1, 2: 1, 3: 1}","{1: 0, 2: 0, 3: 0}",{},"{1: [1, 0, 0, 0, 0, 0, 0, 0], 2: [103, 114, 97...","{1: [1, 0, 0, 0, 0, 0, 0, 0], 2: [103, 114, 97...",,[4],,0
3,0,s3a://go01-demo/warehouse/tablespace/external/...,PARQUET,0,"(639,)",1,985,"{1: 39, 2: 39, 3: 39}","{1: 1, 2: 1, 3: 1}","{1: 0, 2: 0, 3: 0}",{},"{1: [3, 0, 0, 0, 0, 0, 0, 0], 2: [116, 97, 108...","{1: [3, 0, 0, 0, 0, 0, 0, 0], 2: [116, 97, 108...",,[4],,0
4,0,s3a://go01-demo/warehouse/tablespace/external/...,PARQUET,0,"(642,)",1,999,"{1: 39, 2: 41, 3: 39}","{1: 1, 2: 1, 3: 1}","{1: 0, 2: 0, 3: 0}",{},"{1: [2, 0, 0, 0, 0, 0, 0, 0], 2: [103, 114, 97...","{1: [2, 0, 0, 0, 0, 0, 0, 0], 2: [103, 114, 97...",,[4],,0
5,0,s3a://go01-demo/warehouse/tablespace/external/...,PARQUET,0,"(644,)",1,999,"{1: 39, 2: 41, 3: 39}","{1: 1, 2: 1, 3: 1}","{1: 0, 2: 0, 3: 0}",{},"{1: [1, 0, 0, 0, 0, 0, 0, 0], 2: [103, 114, 97...","{1: [1, 0, 0, 0, 0, 0, 0, 0], 2: [103, 114, 97...",,[4],,0
6,0,s3a://go01-demo/warehouse/tablespace/external/...,PARQUET,0,"(639,)",1,985,"{1: 39, 2: 39, 3: 39}","{1: 1, 2: 1, 3: 1}","{1: 0, 2: 0, 3: 0}",{},"{1: [3, 0, 0, 0, 0, 0, 0, 0], 2: [116, 97, 108...","{1: [3, 0, 0, 0, 0, 0, 0, 0], 2: [116, 97, 108...",,[4],,0


RECAP: 
1. The target table coffee was empty. 
2. We ran an insert and that created one new metadata file, one new manifest file, and one new data file.
3. Then we ran a Merge Into from the staging table. Of the three rows that we compared against, one was a match and two were not.
    The one that was a match was rewritten to a brand new data file in the same partition (coffee_sale_ts_month=2023-09) which now has two data files.
    The remaining two rows that were not a match were written to two new data files in the two respective partitions. If they had fallen under the same partition they may have been written into the same data file (next example).