## Reference Material

[EMR Iceberg Documentation](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg.html)


## Configuration

EMR Cluster details used for creating this notebook:

Node: m5.xlarge (single node)
EMR Version: mer-6.8.0
Iceberg Version: 0.14.0-amzn-0
Spark Version: 3.3.0

#### To run this notebook:
> 1. Launch an EMR 6.5+ cluster in one of the subnets on which this EMR Studio is running. (**Iceberg support started with EMR 6.5.0**)
> 2. Launch the cluster with the following configuration classifications:
        [
          {
            "Classification": "iceberg-defaults ",
            "Properties": {
              "iceberg.enabled":"true"
            }
          },
          {
            "Classification": "spark-hive-site ",
            "Properties": {
              "hive.metastore.client.factory.class":        
                 "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
            }
          }
        ]
**Note:** The first classification enables Iceberg. The second one configures Glue Catalog as the Metastore for Spark applications in this cluster.

Here is a sample CLI command to create an EMR cluster. Do remember to replace EMR-STUDIO-SUBNET with one of the subnets in which your EMR Studio is running:

      aws emr create-cluster \
          --name iceberg-emr-cluster\
          --use-default-roles \
          --release-label emr-6.8.0 \
          --instance-count 1 \
          --instance-type m5.xlarge \
          --applications Name=Hadoop Name=Livy Name=Spark Name=JupyterEnterpriseGateway \
          --ec2-attributes SubnetId=<EMR-STUDIO-SUBNET>\
          --configurations '[{"Classification":"iceberg-defaults","Properties":{"iceberg.enabled":"true"}},{"Classification":"spark-hive-site","Properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}}]'
         

Update the S3 path below ``` "spark.sql.catalog.dev.warehouse": ``` as part of the configuration

In [1]:
%%configure -f
{
    "conf":{
        "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.dev.warehouse":"s3://emr-studio-demo-s3bucket-5he9azv0zpew/iceberg/",
        "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.catalog.dev.lock-impl":"org.apache.iceberg.aws.glue.DynamoLockManager",
        "spark.sql.catalog.dev.lock.table":"myGlueLockTable"
        }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1665933496980_0001,pyspark,idle,Link,Link,,


Set a variable equal to the name of the S3 bucket to read / write from

In [2]:
s3_bucket_name = "emr-studio-demo-s3bucket-5he9azv0zpew"

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
10,application_1665933496980_0011,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

from datetime import datetime

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Create an Iceberg Table and run SQL queries using PySpark and ANSI SQL
### Have shown each step using both SQL and PySpark, please use Either SQL or PySpark end to end and DO NOT mix between them for a smooth flow 

In [4]:
%%sql

--#set dev catalog as default using SQL
use dev

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [11]:
#set dev catalog as default using PySpark
spark.sql("use dev")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [5]:
%%sql

--#show current catalog and namespace using SQL (there is no namespace yet as we have not created any database)
show current namespace

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [12]:
#show current catalog and namespace using PySpark (there is no namespace yet as we have not created any database)
spark.sql("show current namespace").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------+
|catalog|namespace|
+-------+---------+
|    dev|   db_sql|
+-------+---------+

In [6]:
%%sql
--#create a database called "db_sql" using SQL
CREATE DATABASE IF NOT EXISTS db_sql;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [13]:
#create a database called "db_ps" using PySpark
spark.sql("CREATE DATABASE IF NOT EXISTS db_ps")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [62]:
%%sql
--#use newly created database as the default database using SQL
use db_sql

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [63]:
%%sql
--#show current namespace using SQL and notice now the namespace is "db_sql" instead of "NaT" as displayed earlier
show current namespace

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [14]:
#use newly created database as the default database using PySpark
spark.sql("use db_ps")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [15]:
#show current namespace using PySpark and notice now the namespace is "db_ps" instead of "NaT" as displayed earlier
spark.sql("show current namespace").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------+
|catalog|namespace|
+-------+---------+
|    dev|    db_ps|
+-------+---------+

In [9]:
%%sql
--#drop table if it exists usign SQL
drop table if exists dev.db_sql.iceberg_table

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [45]:
#drop table if it exists usign PySpark
spark.sql("drop table if exists dev.db_ps.iceberg_table")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [10]:
%%sql
--#Create table in dev catalog --> db database using SQL
CREATE TABLE IF NOT EXISTS dev.db_sql.iceberg_table 
    (id string, 
     name string, 
     create_date string, 
     last_update_time string
    )
USING iceberg

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [46]:
#Create table in dev catalog --> db1 database using PySpark
spark.sql(" CREATE TABLE IF NOT EXISTS dev.db_ps.iceberg_table (id string, name string, create_date string, last_update_time string) USING iceberg")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [None]:
#Create table in dev catalog --> db1 database using PySpark NOT Required to provide path for the table, will be picked up from the warehouse config
#spark.sql(" CREATE TABLE IF NOT EXISTS dev.db_ps.iceberg_table (id string, name string, create_date string, last_update_time string) USING iceberg LOCATION 's3://" + s3_bucket_name + "/iceberg/db/iceberg_table' ")

In [22]:
%%sql
--#List all databases with in dev catalog using SQL ( as we have set dev as teh default catalog, no need to as "in dev", but have shown different ways to query
--show databases
show databases in dev

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [19]:
# List all databases with in dev catalog using PySpark
spark.sql(" SHOW DATABASES in dev ").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+
|namespace|
+---------+
|       db|
|    db_ps|
|   db_sql|
|  default|
+---------+

In [26]:
%%sql
--#List all tables with in dev catalog and db_sql database using SQL ( as we have set dev as teh default catalog, no need to as "in dev.db_sql", but have shown different ways to query
--show tables
show tables in dev.db_sql

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [27]:
# List all tables with in dev catalog and db_ps database using PySpark
spark.sql(" SHOW TABLES IN dev.db_ps ").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-------------+-----------+
|namespace|    tableName|isTemporary|
+---------+-------------+-----------+
|    db_ps|iceberg_table|      false|
+---------+-------------+-----------+

# DML Operations
> ## Using PySpark
> ## Create a DataFrame

In [47]:
data = [
        (1, "Chris", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (2, "Will", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (3, "Emma", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (4, "John", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (5, "Eric", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')),
        (6, "Adam", "2020-01-01", datetime.strptime('2020-01-01 00:00:00', '%Y-%m-%d %H:%M:%S'))
]

schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), False), 
        StructField("create_date", StringType(), False),             
        StructField("last_update_time", TimestampType(), False)    
])

inputDF = spark.createDataFrame(data=data,schema=schema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Write Data to Iceberg Table

In [48]:
inputDF.writeTo("dev.db_ps.iceberg_table").append()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read Data from Iceberg Table

In [49]:
spark.sql(" SELECT * FROM dev.db_ps.iceberg_table ").show() 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----+-----------+-------------------+
| id| name|create_date|   last_update_time|
+---+-----+-----------+-------------------+
|  1|Chris| 2020-01-01|2020-01-01 00:00:00|
|  2| Will| 2020-01-01|2020-01-01 00:00:00|
|  3| Emma| 2020-01-01|2020-01-01 00:00:00|
|  4| John| 2020-01-01|2020-01-01 00:00:00|
|  5| Eric| 2020-01-01|2020-01-01 00:00:00|
|  6| Adam| 2020-01-01|2020-01-01 00:00:00|
+---+-----+-----------+-------------------+

## Update, Delete and Insert Data to Iceberg Table

In [50]:
data = [
        (1, "Christopher", "2020-01-01", datetime.strptime('2020-01-02 00:00:00', '%Y-%m-%d %H:%M:%S'), "update"),
        (3, "Emmeline", "2020-01-01", datetime.strptime('2020-01-02 00:00:00', '%Y-%m-%d %H:%M:%S'), "update"),
        (5, "Eric", "2020-01-01", datetime.strptime('2020-01-02 00:00:00', '%Y-%m-%d %H:%M:%S'), "delete"),
        (7, "Prasad", "2020-01-02", datetime.strptime('2020-01-02 00:00:00', '%Y-%m-%d %H:%M:%S'), "append")
]

schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), False), 
        StructField("create_date", StringType(), False),             
        StructField("last_update_time", TimestampType(), False),
        StructField("change_type", StringType(), False)
])

mergeDF = spark.createDataFrame(data=data,schema=schema)

mergeDF.createOrReplaceTempView("mergeTable")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# spark.sql(" DROP TABLE mergeTable ")

In [51]:
spark.sql(""" 
    MERGE INTO 
        dev.db_ps.iceberg_table t 
    USING 
        (SELECT * FROM mergeTable) s 
    ON 
        t.id = s.id
    WHEN MATCHED AND s.change_type = 'update' THEN UPDATE SET t.name = s.name, t.last_update_time = s.last_update_time 
    WHEN MATCHED AND s.change_type = 'delete' THEN DELETE
    WHEN NOT MATCHED THEN INSERT *
    """)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [52]:
spark.sql(" SELECT * FROM dev.db_ps.iceberg_table ORDER BY id ").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----------+-----------+-------------------+
| id|       name|create_date|   last_update_time|
+---+-----------+-----------+-------------------+
|  1|Christopher| 2020-01-01|2020-01-02 00:00:00|
|  2|       Will| 2020-01-01|2020-01-01 00:00:00|
|  3|   Emmeline| 2020-01-01|2020-01-02 00:00:00|
|  4|       John| 2020-01-01|2020-01-01 00:00:00|
|  6|       Adam| 2020-01-01|2020-01-01 00:00:00|
|  7|     Prasad| 2020-01-02|2020-01-02 00:00:00|
+---+-----------+-----------+-------------------+

## Snapshots

In [53]:
spark.sql(" SELECT * FROM dev.db_ps.iceberg_table.snapshots ").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2022-10-16 21:00:...|8090240036993000156|               null|   append|s3://emr-studio-d...|{spark.app.id -> ...|
|2022-10-16 21:01:...|6617853690041842647|8090240036993000156|overwrite|s3://emr-studio-d...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+

In [54]:
# Add a new record
data = [
        (8, "Bill", "2020-01-02", datetime.strptime('2020-01-03 00:00:00', '%Y-%m-%d %H:%M:%S'))
]

schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), False), 
        StructField("create_date", StringType(), False),             
        StructField("last_update_time", TimestampType(), False)    
])

appendDF = spark.createDataFrame(data=data,schema=schema)

appendDF.writeTo("dev.db_ps.iceberg_table").append()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Time Travel

In [56]:
# Query current table as a point of comparison
spark.sql(" SELECT * FROM dev.db_ps.iceberg_table ORDER BY ID ").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----------+-----------+-------------------+
| id|       name|create_date|   last_update_time|
+---+-----------+-----------+-------------------+
|  1|Christopher| 2020-01-01|2020-01-02 00:00:00|
|  2|       Will| 2020-01-01|2020-01-01 00:00:00|
|  3|   Emmeline| 2020-01-01|2020-01-02 00:00:00|
|  4|       John| 2020-01-01|2020-01-01 00:00:00|
|  6|       Adam| 2020-01-01|2020-01-01 00:00:00|
|  7|     Prasad| 2020-01-02|2020-01-02 00:00:00|
|  8|       Bill| 2020-01-02|2020-01-03 00:00:00|
+---+-----------+-----------+-------------------+

In [57]:
spark.sql(" SELECT * FROM dev.db_ps.iceberg_table TIMESTAMP AS OF '2022-10-16 21:01:00' ").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----+-----------+-------------------+
| id| name|create_date|   last_update_time|
+---+-----+-----------+-------------------+
|  1|Chris| 2020-01-01|2020-01-01 00:00:00|
|  2| Will| 2020-01-01|2020-01-01 00:00:00|
|  3| Emma| 2020-01-01|2020-01-01 00:00:00|
|  4| John| 2020-01-01|2020-01-01 00:00:00|
|  5| Eric| 2020-01-01|2020-01-01 00:00:00|
|  6| Adam| 2020-01-01|2020-01-01 00:00:00|
+---+-----+-----------+-------------------+

## DML Operations
> ## Using SQL

> ## Inserts

In [65]:
%%sql
INSERT INTO dev.db_sql.iceberg_table VALUES
        (1, "Chris", "2020-01-01", current_timestamp()),
        (2, "Will", "2020-01-01", current_timestamp()),
        (3, "Emma", "2020-01-01", current_timestamp()),
        (4, "John", "2020-01-01", current_timestamp()),
        (5, "Eric", "2020-01-01", current_timestamp()),
        (6, "Adam", "2020-01-01", current_timestamp())
        

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [66]:
%%sql
--#Query table iceberg_table in db_sql database and see if the above insert statement actually inserted records or not
SELECT * FROM dev.db_sql.iceberg_table;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [67]:
%%sql
--#Above DML statement has created snapshots, lets check the snapshot and see operation is append as it was a fresh insert
SELECT * FROM dev.db_sql.iceberg_table.snapshots;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

> ## Updates

In [70]:
%%sql
--#Let's update a record and observe the snapshot
UPDATE dev.db_sql.iceberg_table
SET name = "Prasad"
WHERE id = 2

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [72]:
%%sql
--#Query the table and check if the record was updated with the new name
select * from dev.db_sql.iceberg_table order by id

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [73]:
%%sql
--#Above DML statement has added new snapshots, lets check the snapshot and see if the operation is overwrite as it as an UPDATE statement
SELECT * FROM dev.db_sql.iceberg_table.snapshots;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

> ## Deletes

In [75]:
%%sql
--# Let's delete couple of records and see what happens
DELETE FROM dev.db_sql.iceberg_table
where id in (5,4)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [76]:
%%sql
--#Query the table and check if the records were Deleted, should not see records with id 4 and 5
select * from dev.db_sql.iceberg_table order by id

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [77]:
%%sql
--#Above DML statement has added new snapshots, lets check the snapshot and see if the operation is overwrite as it as DELETE statement, it will add a delete marker
SELECT * FROM dev.db_sql.iceberg_table.snapshots;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [78]:
%%sql
--#Manifest shows the number of deleted files
SELECT * FROM dev.db_sql.iceberg_table.manifests

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

> ## UPSERT
>  UPSERT is very common scenario in traditional Data Warehousing paradigm popularly called "Change Data Capture" or "CDC" where changed or updated data along with new records and the ones marked for deletion from source is applied to the target DWH typically on a database, but now with the help of Iceberg the same CDC can be applied to your data on a more modern Datalake like "S3"

In [84]:
%%sql
--# Let's create a new table with CDC data
CREATE TABLE IF NOT EXISTS dev.db_sql.iceberg_table_updates
    (id Integer,
     name String, 
     create_date String,             
     last_update_time Timestamp,
     change_type String)
using iceberg

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [85]:
%%sql
INSERT INTO dev.db_sql.iceberg_table_updates VALUES
        (1, "Christopher", "2020-01-01", current_timestamp(), "update"),
        (3, "Emmeline", "2020-01-01", current_timestamp(), "update"),
        (6, "Adam", "2020-01-01", current_timestamp(), "delete"),
        (7, "Scott", "2020-01-02", current_timestamp(), "append")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [87]:
%%sql
--#Let's Insert, Update and Delete records All in one go
MERGE INTO dev.db_sql.iceberg_table tgt
USING dev.db_sql.iceberg_table_updates src
ON tgt.id = src.id
WHEN MATCHED and src.change_type = 'update' THEN
    UPDATE SET
        id = src.id,
        name = src.name,
        create_date = src.create_date,
        last_update_time = src.last_update_time
WHEN MATCHED and src.change_type = 'delete' THEN DELETE
when NOT MATCHED then INSERT *

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [88]:
%%sql
--#Query teh table to check if CDC was applied to the target table on S3 datalake
--# it should update Chris to Christopher and Emma to Emmeline
--# should delete id# 6 Adam and 
--# Insert a new record Scott id# 7
SELECT * FROM dev.db_sql.iceberg_table order by id;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [91]:
%%sql
--#query the metadata itself to see the file_format, size, lower and upper bounds etc
SELECT * FROM dev.db_sql.iceberg_table.files;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

## Time Travel

In [95]:
%%sql
SELECT * FROM dev.db_sql.iceberg_table TIMESTAMP AS OF "2022-10-16 21:59:00" order by id

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

## Schema Evolution

> ## Add Columns

In [117]:
%%sql
--#Add a new column to iceberg_table table using SQL
ALTER TABLE dev.db_sql.iceberg_table ADD COLUMNS (state string)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [118]:
%%sql
desc table dev.db_sql.iceberg_table

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [124]:
#Add a new column to iceberg_table table using PySpark
spark.sql("ALTER TABLE dev.db_ps.iceberg_table ADD COLUMNS (state string)")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [125]:
spark.sql("DESC TABLE dev.db_ps.iceberg_table").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+---------+-------+
|        col_name|data_type|comment|
+----------------+---------+-------+
|              id|   string|       |
|            name|   string|       |
|     create_date|   string|       |
|last_update_time|   string|       |
|           state|   string|       |
|                |         |       |
|  # Partitioning|         |       |
| Not partitioned|         |       |
+----------------+---------+-------+

In [104]:
%%sql
--#Insert new record with a value for State using SQL
INSERT INTO dev.db_sql.iceberg_table VALUES
        (8, "Jeff", "2020-01-01", current_timestamp(), "Washington")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [106]:
%%sql
--# Let's query the table using SQL and check if the new column was populated
SELECT * FROM dev.db_sql.iceberg_table ORDER BY 1;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [108]:
#Insert new record with a value for State using PySpark
    
data = [
        (8, "Jeff", "2020-01-01", datetime.strptime('2020-01-03 00:00:00', '%Y-%m-%d %H:%M:%S'), "Washington")
]

schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), False), 
        StructField("create_date", StringType(), False),             
        StructField("last_update_time", TimestampType(), False)  ,
        StructField("state", StringType(), False)
])

appendDF1 = spark.createDataFrame(data=data,schema=schema)

appendDF1.writeTo("dev.db_ps.iceberg_table").append()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [111]:
spark.sql("SELECT * FROM dev.db_ps.iceberg_table order by 1").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----------+-----------+-------------------+----------+
| id|       name|create_date|   last_update_time|     state|
+---+-----------+-----------+-------------------+----------+
|  1|Christopher| 2020-01-01|2020-01-02 00:00:00|      null|
|  2|       Will| 2020-01-01|2020-01-01 00:00:00|      null|
|  3|   Emmeline| 2020-01-01|2020-01-02 00:00:00|      null|
|  4|       John| 2020-01-01|2020-01-01 00:00:00|      null|
|  6|       Adam| 2020-01-01|2020-01-01 00:00:00|      null|
|  7|     Prasad| 2020-01-02|2020-01-02 00:00:00|      null|
|  8|       Bill| 2020-01-02|2020-01-03 00:00:00|      null|
|  8|       Jeff| 2020-01-01|2020-01-03 00:00:00|Washington|
+---+-----------+-----------+-------------------+----------+

## Rename Columns

In [120]:
%%sql
--#Rename column state to st in iceberg_table table using SQL
ALTER TABLE dev.db_sql.iceberg_table RENAME COLUMN state TO st

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [121]:
%%sql
DESC TABLE dev.db_sql.iceberg_table

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [126]:
#Add a new column to iceberg_table table using PySpark
spark.sql("ALTER TABLE dev.db_ps.iceberg_table RENAME COLUMN state TO st")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [127]:
spark.sql("DESC TABLE dev.db_ps.iceberg_table").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+---------+-------+
|        col_name|data_type|comment|
+----------------+---------+-------+
|              id|   string|       |
|            name|   string|       |
|     create_date|   string|       |
|last_update_time|   string|       |
|              st|   string|       |
|                |         |       |
|  # Partitioning|         |       |
| Not partitioned|         |       |
+----------------+---------+-------+

## Drop Columns

In [112]:
%%sql
--#Drop column state from iceberg_table table using SQL
ALTER TABLE dev.db_sql.iceberg_table DROP COLUMN state

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [113]:
%%sql
--# as shown below column state is now not part of the table anymore (BUT state column is still avaiable on Glue Catalog, Athena still shows this column)
desc table dev.db_sql.iceberg_table

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [114]:
spark.sql("ALTER TABLE dev.db_ps.iceberg_table DROP COLUMN state")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [116]:
spark.sql("desc table dev.db_sql.iceberg_table").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+---------+-------+
|        col_name|data_type|comment|
+----------------+---------+-------+
|              id|   string|       |
|            name|   string|       |
|     create_date|   string|       |
|last_update_time|   string|       |
|                |         |       |
|  # Partitioning|         |       |
| Not partitioned|         |       |
+----------------+---------+-------+