# Introduction

Iceberg is a high-performance format for huge analytic tables. Iceberg brings the reliability and simplicity of SQL tables to big data. With ever growing data verity and volume, Data lakes became popular as a centralized repository that allows you to store all structured and unstructured data at any scale. 
Data lakes suffices the storage requirements and provides analytical capabilities, but doesn’t address the “transactional” requirements like the DML operations, ACID transactions (Atomicity, Consistency,  Isolation, Durability) with concurrent reads and writes uses cases. With consumer privacy laws like GDPR, CCPA bring in more requirements that challenge the traditional designs of the data lake.

Apache Iceberg is an Open Table Format that solves the modern data lake challenges and requirements by introducing new capabilities that enable multiple applications to work together on the same data in a transactionally consistent manner and defines additional information on the state of datasets as they evolve and change over time.
Apache Iceberg table format offers similar capabilities and functionalities that a traditional RDBMS provides but in a fully open table format so multiple engines like Spark, Trino, Presto can operate on the same dataset. It provides powerful features such as 
- ACID transaction support for concurrent read and writes, provides transactional consistency between multiple applications with full read isolation and concurrent writes.
- Time travel to go back in time, analyze changes to the data between updates, deletes
- Rollback to previous version that allows users to quickly correct issues by restoring the correct data at any point in time.
- Schema evolution that allows users to add, rename, drop columns without rewriting the underlying data ensuring performance.
- Supports multiple file formats like Parquet, ORC, Avro



### For additional details, please refer to Apache Iceberg documentation:
-  __[EMR Iceberg Documentation](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg.html)__

## Pre-Requisite:

For executing the code in this notebook you will need the below:
- A AWS account
Below services should be created and configured
- EMR Studio
- EMR Studio Workspace
- EMR on EKS Virtual Cluster
- EKS Cluster (EC2 based)
- Managed Endpoint
- IAM Policy
- Application Load Balancer
- VPC and Subnet

#### <font color=red>Important step for Iceberg to work with EMR Studio (Jupyter Notebook - JUPYTER_ENTERPRISE_GATEWAY) on EMR-on-EKS cluster</font>
As Jupyter notebook (attached to EMR-on-EKS) does not support cell magic, we do not have an option to configure Iceberg specific parameters including Iceberg Jars.
For this reason we need to configure it as part of the Managed endpoint, below is the config. These are defaults, you can change the values according to your needs for e.g: increase executor.memory from 2G to 4G or 8G whatever you like.

"configurationOverrides": {
>     "applicationConfiguration": [
>         {
>                        "classification": "spark-defaults",
            "properties": {
                "spark.executor.memory": "2G",
                "spark.sql.catalog.glue_catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
                "spark.driver.memory": "2G",
                "spark.kubernetes.executor.request.cores": "1.5",
                "spark.driver.cores": "1",
                "spark.sql.catalogImplementation": "hive",
                "spark.sql.catalog.glue_catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
                "spark.executor.cores": "1",
                "spark.sql.catalog.glue_catalog.warehouse": "s3://<bucket-name>/<folder-name>/",
                "spark.dynamicAllocation.maxExecutors": "20",
                "spark.sql.catalog.glue_catalog": "org.apache.iceberg.spark.SparkCatalog",
                "spark.dynamicAllocation.shuffleTracking.enabled": "true",
                "spark.dynamicAllocation.shuffleTracking.timeout": "300s",
                "spark.kubernetes.driver.request.cores": "0.5",
                "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
                "spark.kubernetes.allocation.batch.size": "2",
                "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
                "spark.sql.catalog.glue_catalog.lock-impl": "org.apache.iceberg.aws.glue.DynamoLockManager",
                "spark.dynamicAllocation.minExecutors": "0",
                "spark.sql.catalog.glue_catalog.lock.table": "myIcebergLockTable",
                "spark.dynamicAllocation.enabled": "true",
                "spark.dynamicAllocation.executorAllocationRatio": "1",
                "spark.jars": "local:///usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"
            }
        }
    ]
    },

### Current Setup used for this notebook

EMR version: emr-6.8.0-latest
EKS version: 1.21
Instance Type for EKS cluster: m5.xlarge
No of Instances: 3

## Connect to Glue catalog, check existing databases and tables

In [6]:
# Connect to Glue catalog, check existing databases and tables
spark.sql("show databases like '*'").show(truncate=False)

+---------+
|namespace|
+---------+
|db       |
|db_ps    |
|db_sql   |
|default  |
+---------+



In [7]:
spark.sql("show tables in default").show(truncate=False)

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|default  |customer |false      |
+---------+---------+-----------+



In [8]:
spark.sql("show current namespace").show()

+-------------+---------+
|      catalog|namespace|
+-------------+---------+
|spark_catalog|  default|
+-------------+---------+



In [11]:
# Change the current namespace to glue_catalog
spark.sql("use glue_catalog")

DataFrame[]

# Create new database and table

In [12]:
# Create new database, at this point you can also open Athena and verify the newly created database
spark.sql("CREATE DATABASE IF NOT EXISTS iceberg_db")

DataFrame[]

In [13]:
#Use newly created database as the default database (namespace)
spark.sql("use iceberg_db")

DataFrame[]

In [14]:
#show current namespace and notice now the namespace is "iceberg_db" instead of "NaT" as displayed earlier
spark.sql("show current namespace").show()

+------------+----------+
|     catalog| namespace|
+------------+----------+
|glue_catalog|iceberg_db|
+------------+----------+



In [15]:
#drop table if it exists usign PySpark (drop table just drops the metadata and not the underlying data files)
spark.sql("DROP TABLE IF EXISTS customer")

DataFrame[]

In [16]:
#Create table in glue_catalog --> iceberg_db database
spark.sql(" CREATE TABLE IF NOT EXISTS customer (cust_id int, cust_name string, cust_age int, cust_loc string) TBLPROPERTIES('table_type' = 'ICEBERG','format' = 'parquet')")

DataFrame[]

# DML Operations
## INSERT, UPDATE, DELETE records in Iceberg table

### INSERT

In [17]:
#Let's insert few records to iceberg table
spark.sql("INSERT INTO customer VALUES (1,'Prasad Nadig', 25, 'NJ')")
spark.sql("INSERT INTO customer VALUES (2,'Ethereum', 80, 'NY')")
spark.sql("INSERT INTO customer VALUES (3,'Cosmos', 25, 'PA')")
spark.sql("INSERT INTO customer VALUES (4,'Solana', 55, 'MD')")
spark.sql("INSERT INTO customer VALUES (5,'Carnado', 15, 'TX')")
spark.sql("INSERT INTO customer VALUES (6,'Link', 45, 'NJ')")

DataFrame[]

In [19]:
# Verify if the insert was successful
spark.sql("SELECT * FROM customer ORDER BY cust_id").show()

+-------+------------+--------+--------+
|cust_id|   cust_name|cust_age|cust_loc|
+-------+------------+--------+--------+
|      1|Prasad Nadig|      25|      NJ|
|      2|    Ethereum|      80|      NY|
|      3|      Cosmos|      25|      PA|
|      4|      Solana|      55|      MD|
|      5|     Carnado|      15|      TX|
|      6|        Link|      45|      NJ|
+-------+------------+--------+--------+



### UPDATE

In [20]:
#Similar to traditional RDBMS, you can issue UPDATE statements to Iceberg tables on Datalakes
spark.sql("UPDATE customer SET cust_age = 50 WHERE cust_id = 6") 

DataFrame[]

In [21]:
#Verify updates
spark.sql("SELECT * FROM customer WHERE cust_id = 6 ORDER BY cust_id").show()

+-------+---------+--------+--------+
|cust_id|cust_name|cust_age|cust_loc|
+-------+---------+--------+--------+
|      6|     Link|      50|      NJ|
+-------+---------+--------+--------+



### DELETE

In [22]:
spark.sql("DELETE FROM customer WHERE cust_id = 4")

DataFrame[]

In [23]:
#Verify delete, should return ZERO records
spark.sql("SELECT * FROM customer WHERE cust_id = 4 ORDER BY cust_id").show()

+-------+---------+--------+--------+
|cust_id|cust_name|cust_age|cust_loc|
+-------+---------+--------+--------+
+-------+---------+--------+--------+



## Snapshots
### For every DML operation, Iceberg table creates a snapshop, each operation type is captured under "operation" column

In [24]:
spark.sql(" SELECT * FROM glue_catalog.iceberg_db.customer.snapshots ").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2022-11-09 12:10:...|2174872175979965013|               null|   append|s3://emr-studio-e...|{spark.app.id -> ...|
|2022-11-09 12:10:...|2816279682983579927|2174872175979965013|   append|s3://emr-studio-e...|{spark.app.id -> ...|
|2022-11-09 12:10:...|7772798101801262353|2816279682983579927|   append|s3://emr-studio-e...|{spark.app.id -> ...|
|2022-11-09 12:10:...|5029206383255222294|7772798101801262353|   append|s3://emr-studio-e...|{spark.app.id -> ...|
|2022-11-09 12:10:...|7465481207241770807|5029206383255222294|   append|s3://emr-studio-e...|{spark.app.id -> ...|
|2022-11-09 12:10:...|2056239600440729870|7465481207241770807|   append|s3://emr

## Time Travel

In [25]:
#Insert a new record, with time travel we can go back in time when the new record did not exist
spark.sql("INSERT INTO customer VALUES (7,'Polygon', 60, 'CT')")

DataFrame[]

In [31]:
#Now let's do a time travel to go back in time before this new record was inserted
spark.sql(" SELECT * FROM customer TIMESTAMP AS OF '2022-11-9 12:11:00' ORDER BY cust_id ").show()

+-------+------------+--------+--------+
|cust_id|   cust_name|cust_age|cust_loc|
+-------+------------+--------+--------+
|      1|Prasad Nadig|      25|      NJ|
|      2|    Ethereum|      80|      NY|
|      3|      Cosmos|      25|      PA|
|      4|      Solana|      55|      MD|
|      5|     Carnado|      15|      TX|
|      6|        Link|      45|      NJ|
+-------+------------+--------+--------+



In [32]:
#Update a record
spark.sql("UPDATE customer SET cust_age = 60 WHERE cust_id = 3") 

DataFrame[]

In [34]:
#Now let's do a time travel to go back in time before this new record was updated to see the old value
spark.sql(" SELECT * FROM customer TIMESTAMP AS OF '2022-11-9 12:14:00' WHERE cust_id = 3 ORDER BY cust_id ").show()

+-------+---------+--------+--------+
|cust_id|cust_name|cust_age|cust_loc|
+-------+---------+--------+--------+
|      3|   Cosmos|      25|      PA|
+-------+---------+--------+--------+



In [35]:
#Iceberg also allows you to see the deleted records with time travel, let's try it out
spark.sql("DELETE FROM customer WHERE cust_id = 2")

DataFrame[]

In [36]:
#Let's do a time travel 2 mins backwards from now to see the deleted record still exists in the table
spark.sql(" SELECT * FROM customer TIMESTAMP AS OF '2022-11-9 12:15:00' WHERE cust_id = 2 ORDER BY cust_id ").show()

+-------+---------+--------+--------+
|cust_id|cust_name|cust_age|cust_loc|
+-------+---------+--------+--------+
|      2| Ethereum|      80|      NY|
+-------+---------+--------+--------+



In [37]:
#in comparison to the above time travel query, lets check the current records in the table, notice cust_id 2 doesnt exists anymore
spark.sql("SELECT * FROM customer ORDER BY cust_id").show()

+-------+------------+--------+--------+
|cust_id|   cust_name|cust_age|cust_loc|
+-------+------------+--------+--------+
|      1|Prasad Nadig|      25|      NJ|
|      3|      Cosmos|      60|      PA|
|      5|     Carnado|      15|      TX|
|      6|        Link|      50|      NJ|
|      7|     Polygon|      60|      CT|
+-------+------------+--------+--------+



## Schema Evolution
### Add Columns

In [38]:
#Add a new column to iceberg_table table using PySpark
spark.sql("ALTER TABLE customer ADD COLUMNS (phone_no int)")

DataFrame[]

In [39]:
spark.sql("DESC TABLE customer").show()

+---------------+---------+-------+
|       col_name|data_type|comment|
+---------------+---------+-------+
|        cust_id|      int|       |
|      cust_name|   string|       |
|       cust_age|      int|       |
|       cust_loc|   string|       |
|       phone_no|      int|       |
|               |         |       |
| # Partitioning|         |       |
|Not partitioned|         |       |
+---------------+---------+-------+



In [40]:
#Insert new record with a value for Phone_no
spark.sql("INSERT INTO customer VALUES(8,'Avalanche',20,'NY',1234567890)")

DataFrame[]

In [41]:
# Query customer table and check if the new column was populated
spark.sql("SELECT * FROM customer ORDER BY cust_id").show()

+-------+------------+--------+--------+----------+
|cust_id|   cust_name|cust_age|cust_loc|  phone_no|
+-------+------------+--------+--------+----------+
|      1|Prasad Nadig|      25|      NJ|      null|
|      3|      Cosmos|      60|      PA|      null|
|      5|     Carnado|      15|      TX|      null|
|      6|        Link|      50|      NJ|      null|
|      7|     Polygon|      60|      CT|      null|
|      8|   Avalanche|      20|      NY|1234567890|
+-------+------------+--------+--------+----------+



## Rename Column

In [42]:
#Rename an existing column
spark.sql("ALTER TABLE customer RENAME COLUMN phone_no TO cust_phone_no")

DataFrame[]

In [43]:
spark.sql("DESC TABLE customer").show()

+---------------+---------+-------+
|       col_name|data_type|comment|
+---------------+---------+-------+
|        cust_id|      int|       |
|      cust_name|   string|       |
|       cust_age|      int|       |
|       cust_loc|   string|       |
|  cust_phone_no|      int|       |
|               |         |       |
| # Partitioning|         |       |
|Not partitioned|         |       |
+---------------+---------+-------+



## Drop Column

In [44]:
#Drop an existing column, this should drop the column in metadata only
spark.sql("ALTER TABLE customer DROP COLUMN cust_phone_no")

DataFrame[]

In [45]:
spark.sql("DESC TABLE customer").show()

+---------------+---------+-------+
|       col_name|data_type|comment|
+---------------+---------+-------+
|        cust_id|      int|       |
|      cust_name|   string|       |
|       cust_age|      int|       |
|       cust_loc|   string|       |
|               |         |       |
| # Partitioning|         |       |
|Not partitioned|         |       |
+---------------+---------+-------+



In [47]:
#Similar to record deletion, with time travel we can also see the column that was dropped, pretty powerful isn't it?
spark.sql(" SELECT * FROM customer TIMESTAMP AS OF '2022-11-9 12:21:00' ORDER BY cust_id ").show()

+-------+------------+--------+--------+----------+
|cust_id|   cust_name|cust_age|cust_loc|  phone_no|
+-------+------------+--------+--------+----------+
|      1|Prasad Nadig|      25|      NJ|      null|
|      3|      Cosmos|      60|      PA|      null|
|      5|     Carnado|      15|      TX|      null|
|      6|        Link|      50|      NJ|      null|
|      7|     Polygon|      60|      CT|      null|
|      8|   Avalanche|      20|      NY|1234567890|
+-------+------------+--------+--------+----------+

