# Iceberg Example Using Hive Catalog Notebook

## Topics covered in this example

1) [Configuring Iceberg](#configure_iceberg) <br>
2) [Creating an Iceberg Table](#create_table) <br>
3) [DML Statements](#dml) <br>
&emsp;&emsp;&emsp;&emsp;a) [Inserts](#inserts) <br>
&emsp;&emsp;&emsp;&emsp;b) [Deletes](#deletes) <br>
&emsp;&emsp;&emsp;&emsp;d) [Updates](#updates) <br>
4) [Schema Evolution](#schema_evolution) <br>
&emsp;&emsp;&emsp;&emsp;a) [Adding Columns](#adding_columns) <br>
&emsp;&emsp;&emsp;&emsp;c) [Dropping Columns](#dropping_columns) <br>
5) [Time Travel](#time_travel) <br>
&emsp;&emsp;&emsp;&emsp;a) [Rollback](#rollback) <br>
6) [Partition Evolution](#partition_evolution) <br>

***

## Introduction
Apache Iceberg (https://iceberg.apache.org/) is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink and Hive using a high-performance table format that works just like a SQL table. Iceberg tracks individual data files in a table instead of directories. This allows writers to create data files in-place and only adds files to the table in an explicit commit. Every time a new file is inserted to any partition in this table, a new point-in-time snapshot of all the files get created. At the query time, there is no need to list a directory to find the files we need to work with, as the snapshot already has that information pre-populated during the write time (commonly known as snapshot isolation (https://en.wikipedia.org/wiki/Snapshot_isolation) in databases).

Iceberg supports write, delete, update, and time travel operations with complete support for ACID transactions (https://en.wikipedia.org/wiki/ACID). Table changes are atomic and readers never see partial or uncommitted changes (serializable isolation (https://en.wikipedia.org/wiki/Isolation_(database_systems)#Serializable))

Iceberg table format is an open specification at multiple levels. At the catalog level, you can plugin multiple types of catalogs such as hive, hadoop, AWS Glue Data Catalog etc. All these can co-exist. You can join tables across different types of catalogs. In this example, we are going to work with Hive Data Catalog

***

## Setup
Create an S3 bucket location to save sample dataset. In this example we use the path format: s3://<span style="color:red">MYBUCKET</span>/iceberg/<span style="color:red">YOUR-CATALOG-NAME</span>/tables/ 
    
    For example: s3://MYBUCKET/iceberg/db/amazon_reviews_iceberg

update the MYBUCKET with the bucket which you create.
***


<a id="configure_iceberg"></a>
## Configuring Iceberg on Spark session


If you don't remember your bucket name, Run the ClI command below 

In [None]:
%%sh
aws s3 ls

Configure your Spark session using the %%configure magic command. We will be using Hive Catalog for Iceberg Tables

(Update the <span style="color:red">MYBUCKET</span> in configuration below with you S3 bucket name)

In [None]:
%%configure -f
{
"conf":{
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog",
    "spark.sql.catalog.spark_catalog.type":"hive",
    "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.dev.type":"hadoop",
    "spark.sql.catalog.dev.warehouse":"s3://MYBUCKET/iceberg/"
    }
}

<a id="create_table"></a>
## Creating an Iceberg Table

Create Iceberg Table, this table is using Hive Catalog.


In [None]:
spark.sql(""" DROP TABLE if exists dev.db.amazon_reviews_iceberg""")

spark.sql(""" CREATE TABLE dev.db.amazon_reviews_iceberg 
(marketplace	string
,customer_id	string
,review_id	string
,product_category	string
,product_id	string
,product_parent	string
,product_title	string
,star_rating	int
,helpful_votes	int
,total_votes	int
,vine	string
,verified_purchase	string
,review_headline	string
,review_body	string
,review_date	bigint)

USING iceberg 
location 's3://MYBUCKET/iceberg/db/amazon_reviews_iceberg'
""")

<a id="dml"></a>
## DML Operations
Icerberg supports all DML statements to add or modify data in your data lake: Inserts to add new data, Updates to modify specific columns in specific rows in your existing data, Deletes for GDPR and CCPA compliance and Upserts when you have incoming data that may have a mix of inserts and updates. Let us look at each of them now.

<a id="inserts"></a>
### Inserts

**We will be using simulated Amazon Product Reviews  dataset.**

We are loading just one partition for sake of simplicity

In [None]:
df = spark.read.parquet(
    "s3://MYBUCKET/productreviews/simulatedproductreviews.parquet"
)

Run below cell to write data into the Iceberg table, We are writing just one partition for sake of simplicity

In [None]:
df.sortWithinPartitions("review_date").writeTo(
    "dev.db.amazon_reviews_iceberg").append()

Verify data is loaded into iceberg table successfully.

In [None]:
%%sql
select * from dev.db.amazon_reviews_iceberg limit 9

<a id="deletes"></a>
### Deletes
GDPR and CCPA regulations mandate timely removal of individual customer data and other records from datasets. Iceberg is designed to be able to handle these trivially.
Now let us delete a record from our Iceberg table.

Delete all records from the table for verified_purchase = 'N'

In [None]:
spark.sql("""delete from dev.db.amazon_reviews_iceberg
where verified_purchase = 'N'""")

Test if data is deleted. Below query should produce zero records.

In [None]:
spark.sql(
    """select * from dev.db.amazon_reviews_iceberg where verified_purchase = 'N'limit 9"""
).show()

<a id="updates"></a>
### Updates
What if we want to go back and update an existing record? Let's change the `marketplace` from US to USA. Iceberg allows updates using a simple `UPDATE` and`SET` clause added to your query

In [None]:
spark.sql("""UPDATE dev.db.amazon_reviews_iceberg
SET marketplace = 'USA'
WHERE marketplace = 'US'""")

Verify 'marketplace' column is updated

In [None]:
%%sql
select * from dev.db.amazon_reviews_iceberg limit 5

<a id="schema_evolution"></a>
## Schema Evolution
Borrowing from the way columns work in databases, Iceberg tracks columns by using unique IDs and not by the column name. As long as the ID is the same, all the data still remains. You can safely add, drop, rename, update, or even reorder columns. You don’t have to rewrite the data for this. Schema evolution gets first class citizen treatment in Iceberg. Your ingest and read queries now have the freedom to be evolved without having to hide the schema inside JSON blobs.

In this example we will add a column to the iceberg table which we just created. We will add comment column to the table.

<a id="adding_columns"></a>
### Adding Columns
Now we are going to add another column called `high_rated_product`. Iceberg also allows documenting the purpose for each column as `comment`, which helps a lot in a collaborative environment and quick lookup of data from business users.

In [None]:
spark.sql(
    """ALTER TABLE dev.db.amazon_reviews_iceberg ADD COLUMNS (high_rated_product string comment 'Highly rated comment')"""
)

We will add **High rated** flag to the comment column where rating is greater or equal to 4

In [None]:
spark.sql(
    """UPDATE dev.db.amazon_reviews_iceberg SET high_rated_product = 'High rated' where star_rating >=4"""
)

Verify column is added successfully by quering the table.

In [None]:
%%sql
Select customer_id,review_id,product_id, product_title, star_rating, high_rated_product from dev.db.amazon_reviews_iceberg limit 9

<a id="dropping_columns"></a>
### Dropping Columns
Now, there is a change in business requirements, we are not interested in the `high_rated_product` column anymore and need to remove that column from our table. Iceberg allows us to do that easily.

In [None]:
spark.sql(
    """ALTER TABLE dev.db.amazon_reviews_iceberg DROP COLUMN high_rated_product"""
)

<a id="time_travel"></a>
## Time Travel


Iceberg does give us a way to look at the history of changes to our table using the `history` metadata table.

In [None]:
spark.sql("SELECT * FROM dev.db.amazon_reviews_iceberg.history").show()

Also Iceberg does give us a way to look at the snapshots

In [None]:
spark.sql("SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots").show()

<a id="rollback"></a>
### Rollback
To undo the recent changes, we can execute Iceberg stored procedures using `CALL` statement to rollback the state of the table to any historical commit using `rollback_to_snapshot` stored procedure. We could also use `rollback_to_timestamp`.

Recover the table to its original state, replace the <span style="color:red">xxxxxxxxxxxxx</span> with Snapshot id from Table History. Use the snapshot_id with parent_id = null from Table History (first record)

In [None]:
spark.sql(
    "CALL dev.system.rollback_to_snapshot('db.amazon_reviews_iceberg', xxxxxxxxxxxxx)"
)

Our table is now back to original state. You can verifiy this by observing verified_purchase column it shoould show both 'Y' and 'N' values.

In [None]:
%%sql
select * from dev.db.amazon_reviews_iceberg limit 5

<a id="partition_evolution"></a>
##  Partition Evolution
Let us look at the partitions we have in our table by querying the partitions metadata table. Iceberg keeps track of how many records (record_count column) and how many files (file_count column) are present in each partition. This is a very handy tool that could be used for performance and data quality related troubleshooting and diagnostics.

In [None]:
spark.sql("select * from dev.db.amazon_reviews_iceberg.partitions").show()

Let us list our s3 bucket location to see the partitions. Currently our table has no partitioning. All data files are just at the root data prefix. (Remember to replace <span style="color:red">MYBUCKET</span> with your bucket name and if you use different prefixes, update the path as applicable.)

In [None]:
%%sh
aws s3 ls s3://MYBUCKET/iceberg/db/amazon_reviews_iceberg/data/


<a id="Create new column in Date format"></a>
###  Create new column with Date datatype
Let's create a new column **review_dt**.  
We will use this new column for our new partitions

In [None]:
%%sql
ALTER TABLE dev.db.amazon_reviews_iceberg ADD COLUMNS (review_dt date);

Update the new column with date values

In [None]:
%%sql
UPDATE dev.db.amazon_reviews_iceberg set review_dt = date_add(to_date('1970-01-01'),cast(review_date as integer));

Lets first create **yearly** partitions. Iceberg allows us to add partitions without having to perform any data movement or any additional changes to the underlying data. `ADD PARTITION FIELD` is a simple metadata operation.

In [None]:
%%sql
ALTER TABLE dev.db.amazon_reviews_iceberg add PARTITION FIELD years(review_dt)

We can continue to use the old partition on the old data. There is no change to the underlying partition structure on existing data as shown below (Again remember to replace <span style="color:red">MYBUCKET</span> with your bucket name and if you use different prefixes, update the path as applicable):

In [None]:
%%sh
aws s3 ls s3://MYBUCKET/iceberg/db/amazon_reviews_iceberg/data/

Now let's insert new data for years 1998 and 2015. This will create duplicate rows, but we're just testing what happens if new data is inserted after new partition is created

In [None]:
%%sql

INSERT INTO dev.db.amazon_reviews_iceberg
SELECT * FROM dev.db.amazon_reviews_iceberg WHERE year(review_dt)=1998 
union 
SELECT * FROM dev.db.amazon_reviews_iceberg WHERE year(review_dt)=2015 


Now let's check the S3 bucket structure again.  You should now see new directories based on the new partitions.  
Before running the following cell, replace <span style="color:red">MYBUCKET</span>  with your bucket name and if you use different prefixes, update the path as applicable.

In [None]:
%%sh
aws s3 ls s3://MYBUCKET/iceberg/db/amazon_reviews_iceberg/data/

Let us assume down the time line, we realize we need to add **monthly** partitions. 

In [None]:
%%sql
ALTER TABLE dev.db.amazon_reviews_iceberg ADD PARTITION FIELD months(review_dt)

let's insert new data after the month partition has been set

In [None]:
%%sql
INSERT INTO dev.db.amazon_reviews_iceberg
SELECT * FROM dev.db.amazon_reviews_iceberg WHERE year(review_dt)=1998 and month(review_dt)=9
union 
SELECT * FROM dev.db.amazon_reviews_iceberg WHERE year(review_dt)=2000 and month(review_dt)=9


Now let's see the results with new data with month partitions. Iceberg adds the new Monthly partitions under the year partitions under which we inserted our new records.   (Replace <span style="color:red">MYBUCKET</span> with your bucket name and if you use different prefixes, update the path as applicable.)

In [None]:
%%sh
echo "Top level ------------------------------------------------------------------------------------"
aws s3 ls s3://MYBUCKET/iceberg/db/amazon_reviews_iceberg/data/
echo ""
echo "inside of review_dt_year=1998 directory-------------------------------------------------------"
aws s3 ls s3://MYBUCKET/iceberg/db/amazon_reviews_iceberg/data/review_dt_year=1998/
echo ""
echo "inside of review_dt_year=2000 directory-------------------------------------------------------"
aws s3 ls s3://MYBUCKET/iceberg/db/amazon_reviews_iceberg/data/review_dt_year=2000/

Let us query our table using the new monthly partition.

In [None]:
%%sql

SELECT marketplace, product_title, review_dt FROM dev.db.amazon_reviews_iceberg WHERE year(review_dt)=1998 and month(review_dt)=9 limit 5


We can continue to query our old data with using the year() transform. There is only the original review_dt column in the table. We don't have to store additional columns to accommodate multiple paritioning schemes. Everything is in the metadata giving us immense flexibility and making our data lake forward looking!

In [None]:
%%sql
    select marketplace, customer_id, product_category, product_title, star_rating, verified_purchase, review_headline, review_dt
    from dev.db.amazon_reviews_iceberg where year(review_dt) = 1999 limit 10


### THANK YOU! ###