# Use the AWS Glue connector to read and write Apache Iceberg tables with ACID transactions and perform time travel
This notebook is relevant to the AWS Blog post; [Use the AWS Glue connector to read and write Apache Iceberg tables with ACID transactions and perform time travel](#).

## Start an AWS Glue Studio notebook for Iceberg
We start an AWS Glue Studio notebook and load the configuration which you specified in the CloudFormation.

### Set up the Iceberg configuration
Set your connection name that you created in the previous section.
* `%connections` - type your connection bane that you created in **Setting up the Apache Iceberg connector and creating the connection**

In [None]:
# Execute this cell to configure and start your notebook.
%idle_timeout 60
%connections # set you Glue connection name (e.g. iceberg-connection)
%glue_version 3.0

In [None]:
# Stop firstly initialized SparkSession and SparkContext to re-create them with Iceberg configuration.
spark.stop()
sc.stop()

In [None]:
# Import CFn inputs by boto3. Replace `<YOUR_BUCKET_NAME>` with your S3 bucket name.
import json
import boto3

S3_BUCKET = '<YOUR_BUCKET_NAME>'

s3r = boto3.resource('s3')
user_config = json.loads(s3r.Object(bucket_name=S3_BUCKET, key='config/user_config.json').get()['Body'].read())

print(f'Your CloudFormation inputs: {user_config}')

### Initialize the job with Iceberg configurations
We initialize a Glue job with Iceberg configurations. Before initializing the job, we specify a Spark Catalog name, and load an Iceberg warehouse path and a DynamoDB table name from `user_config` which you set in the previous section. Then, we initialize the job by passing necessary configuration to use Iceberg on Glue. You can see the details about each configuration in **Appendix: Spark configurations to use Iceberg on AWS Glue**.

In [None]:
# Set your specified resources
CATALOG = 'glue_catalog'
WAREHOUSE_PATH = user_config['warehouse_path']
DYNAMODB_TABLE = user_config['dynamodb_table']

In [None]:
# Initialize SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .config(f'spark.sql.catalog.{CATALOG}', 'org.apache.iceberg.spark.SparkCatalog') \
    .config(f'spark.sql.catalog.{CATALOG}.warehouse', f'{WAREHOUSE_PATH}') \
    .config(f'spark.sql.catalog.{CATALOG}.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog') \
    .config(f'spark.sql.catalog.{CATALOG}.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') \
    .config(f'spark.sql.catalog.{CATALOG}.lock-impl', 'org.apache.iceberg.aws.glue.DynamoLockManager') \
    .config(f'spark.sql.catalog.{CATALOG}.lock.table', f'{DYNAMODB_TABLE}') \
    .config(f'spark.sql.extensions','org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .getOrCreate()

In [None]:
# Initialize SparkContext and GlueContext
from awsglue.context import GlueContext

sc = spark.sparkContext
gc = GlueContext(sc)

## 1. Create Iceberg tables of customer reviews and BI reports
We create `acr_iceberg` and `acr_iceberg_report` Iceberg tables. Then, we run a `SELECT` query to check table records.

### Create the `acr_iceberg` table for customer reviews
We create an Iceberg table (names `acr_iceberg`) for your specified database in the Glue Data Catalog. When creating the table, Iceberg also writes the actual data in your specified S3 bucket. Note that the script loads partial datasets to avoids taking a lot of time to data loading.

In [None]:
DATABASE = user_config['database_name']
TABLE = user_config['table_name']

print(f'Database: {DATABASE}, Table: {TABLE}.')

In [None]:
INPUT_BASE_PATH = 's3://amazon-reviews-pds/parquet'
PRODUCT_CATEGORIES = ['Apparel', 'Camera', 'PC', 'Software', 'Video']
INPUT_CATEGORIES = [f'{INPUT_BASE_PATH}/product_category={category}/' for category in PRODUCT_CATEGORIES]

# Loading the dataset and creating an Iceberg table. This will take about 5 miniutes.
spark.read \
    .option('basePath', INPUT_BASE_PATH) \
    .parquet(*INPUT_CATEGORIES) \
    .writeTo(f'{CATALOG}.{DATABASE}.{TABLE}') \
    .tableProperty('format-version', '2') \
    .create()

### Create the `acr_iceberg_report` Iceberg table for BI reports
We also create `acr_iceberg_report` as an Iceberg table for your specified database in Glue Data Catalog by running the following SQL expression. This table is used as a BI report for customer review analysis. We query this table in step 4.

<div class="alert alert-block alert-info"><b>NOTE: If you specify a custom database name, replace <code>iceberg_blog_default</code> with the database name in each query hereafter.</b> 

In [None]:
%%sql
CREATE TABLE glue_catalog.iceberg_blog_default.acr_iceberg_report \
USING iceberg \
AS SELECT comment_count, avg_star, product_category FROM ( \
    VALUES \
        (1240, 4.20729367860598, \'Camera\'), \
        (95, 4.80167540490342, \'Industrial_Supplies\'), \
        (663, 3.80123467540571, \'PC\') \
) AS t(comment_count, avg_star, product_category)

### Determine the average star rating for each product category by querying the Iceberg table
You can see the Iceberg table records by using a SELECT statement. In this section we query the `acr_iceberg` table. In particular, we see the aggregated number of customer comments and mean star rating for each `product_category`.

In [None]:
%%sql
SELECT \
    count(*) as comment_count, \
    avg(star_rating) as avg_star, \
    product_category \
FROM glue_catalog.iceberg_blog_default.acr_iceberg \
GROUP BY product_category \
ORDER BY product_category, avg_star ASC

## 2. Add customer reviews in the Iceberg table
We add the following 5 additional customer comments to the `acr_iceberg`. We perform an `INSERT` query for the Iceberg table to add the comments. The product category for these comments are `Industrial Supplies`.

Run the following cell and generate additional customer reviews. This table is registered as a temporary table with its name `additional_reviews`.

In [None]:
# Generate additional records
new_reviews = [
    {'marketplace':'US','customer_id':12345689,'review_id':'ISB35E4556F144','product_id':'I00EDBY7X8','product_parent':989172340,'product_title':'plastic containers','star_rating':5,'helpful_votes':0,'total_votes':0,'vine':'N','verified_purchase':'Y','review_headline':'Five Stars','review_body':'Great product!','review_date':'2022-02-01','year':2022,'product_category':'Industrial_Supplies'},
    {'marketplace':'US','customer_id':78901234,'review_id':'IS4392CD4C3C4','product_id':'I00D7JFOPC','product_parent':952000001,'product_title':'battery tester','star_rating':3,'helpful_votes':0,'total_votes':0,'vine':'N','verified_purchase':'Y','review_headline':'nice one, but it broke some days later','review_body':'nope','review_date':'2022-02-01','year':2022,'product_category':'Industrial_Supplies'},
    {'marketplace':'US','customer_id':12345123,'review_id':'IS97B103F8B24C','product_id':'I002LHA74O','product_parent':818426953,'product_title':'spray bottle','star_rating':2,'helpful_votes':1,'total_votes':1,'vine':'N','verified_purchase':'N','review_headline':'Two Stars','review_body':'the bottle isn\'t as big as pictured.','review_date':'2022-02-01','year':2022,'product_category':'Industrial_Supplies'},
    {'marketplace':'US','customer_id':23000093,'review_id':'ISAB4268D46F3X','product_id':'I00ARPLCGY','product_parent':562945918,'product_title':'3d printer','star_rating':5,'helpful_votes':3,'total_votes':3,'vine':'N','verified_purchase':'Y','review_headline':'Super great','review_body':'very useful','review_date':'2022-02-01','year':2022,'product_category':'Industrial_Supplies'},
    {'marketplace':'US','customer_id':89874312,'review_id':'ISAB4268137V2Y','product_id':'I80ARDQCY','product_parent':564669018,'product_title':'circuit board','star_rating':4,'helpful_votes':0,'total_votes':0,'vine':'Y','verified_purchase':'Y','review_headline':'Great, but a little bit expensive','review_body':'you should buy this, but note the price','review_date':'2022-02-01','year':2022,'product_category':'Industrial_Supplies'}
]

TEMP_TABLE = "additional_reviews"

# Creating a temporary table based on the additional customer reviews.
# The temporary table is only available within the current Spark session, 
#   and the table is removed after the session ends.
from pyspark.sql import Row
from pyspark.sql.functions import to_date
df = spark.createDataFrame(Row(**review) for review in new_reviews)
df.withColumn('review_date', to_date(df.review_date, 'yyyy-MM-dd')) \
    .createOrReplaceTempView(TEMP_TABLE)

Then, insert these records into the Iceberg table by running the following cell. 

In [None]:
%%sql
INSERT INTO glue_catalog.iceberg_blog_default.acr_iceberg SELECT * FROM additional_reviews

You can check the inserted records by running the following query.

In [None]:
%%sql
SELECT \
    count(*) as comment_count, \
    avg(star_rating) as avg_star, \
    product_category \
FROM glue_catalog.iceberg_blog_default.acr_iceberg \
GROUP BY product_category \
ORDER BY product_category, avg_star ASC

## 3. Update a customer review in the Iceberg table
A customer `78901234` requests the following update of the review whose ID `IS4392CD4C3C4`.
* change `star_rating` from `3` to `5`
* update the `review_headline` from `nice one, but it broke some days later` to `very good`

We update the customer review by using an UPDATE query for the `acr_iceberg` table

In [None]:
%%sql
UPDATE glue_catalog.iceberg_blog_default.acr_iceberg \
SET star_rating = 5, review_headline = \'very good\' \
WHERE customer_id = \'78901234\' AND review_id = \'IS4392CD4C3C4\'

You can see the updated record by running the following cell.

In [None]:
%%sql
SELECT \
    customer_id, \
    review_id, \
    star_rating, \
    review_headline \
FROM glue_catalog.iceberg_blog_default.acr_iceberg \
WHERE customer_id = \'78901234\' AND review_id = \'IS4392CD4C3C4\'

You can also see the update on `avg_star` in the record of `Industrial_Supplies` category by running the following cell.

In [None]:
%%sql
SELECT \
    count(*) as comment_count, \
    avg(star_rating) as avg_star, \
    product_category \
FROM glue_catalog.iceberg_blog_default.acr_iceberg \
GROUP BY product_category \
ORDER BY product_category, avg_star ASC

## 4. Reflect changes in the customer reviews table in the BI report table with a MERGE INTO query
We reflect the changes in the `acr_iceberg` table into the BI report `acr_iceberg_report` table. To reflect the changes, we run the [MERGE INTO](https://iceberg.apache.org/docs/latest/spark-writes/#merge-into) query that is similar to UPSERT (Update and Insert operation), and combine the two tables based on the condition of the `product_category` column in each table. To complete this, run the following cell.

In [None]:
%%sql
MERGE INTO glue_catalog.iceberg_blog_default.acr_iceberg_report as t1 \
USING ( \
    SELECT \
        count(*) as comment_count, \
        avg(star_rating) as avg_star, \
        product_category \
    FROM glue_catalog.iceberg_blog_default.acr_iceberg \
    GROUP BY product_category \
) AS t2 \
ON t2.product_category = t1.product_category \
WHEN \
    MATCHED THEN UPDATE SET t1.comment_count = t1.comment_count + t2.comment_count, t1.avg_star = (t1.avg_star + t2.avg_star)/2 \
WHEN \
    NOT MATCHED THEN INSERT *

Then, you can see the updated report table by `MERGE INTO` query by running the following cell.

In [None]:
%%sql
SELECT * FROM glue_catalog.iceberg_blog_default.acr_iceberg_report \
ORDER BY product_category, avg_star ASC

## 5. Roll back the Iceberg tables and reflect changes in the BI report table
We assume that the customer wants to revert updating the review which we did in the **Step 3. Updating a customer review in the Iceberg table** section. 
To complete the customer’s reverting request, we do the following 3 steps:
1. Checking the history of table changes of `acr_iceberg` and `acr_iceberg_report` to get each table snapshot
2. Rollback of `acr_iceberg` to the version of when we inserted records in **Step 2. Adding customer reviews into the Iceberg table by INSERT query** section, and also rollback of `acr_iceberg_reporttable` to the initial version to reflect the customer review update
3. Then merging the `acr_iceberg` table with the `acr_iceberg_report` table

### Get the metadata of each report table
We can get metadata of Iceberg tables by using [inspecing tables query](https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables). 

You can see the `acr_iceberg` table metadata by running the following cell.

In [None]:
%%sql
SELECT \
    h.made_current_at, \
    h.snapshot_id, \
    s.operation \
FROM glue_catalog.iceberg_blog_default.acr_iceberg.history h \
JOIN glue_catalog.iceberg_blog_default.acr_iceberg.snapshots s \
ON h.snapshot_id = s.snapshot_id \
ORDER BY made_current_at DESC

You can see the `acr_iceberg_report` table metadata by running the following cell.

In [None]:
%%sql
SELECT \
    h.made_current_at, \
    h.snapshot_id, \
    s.operation \
FROM glue_catalog.iceberg_blog_default.acr_iceberg_report.history h \
JOIN glue_catalog.iceberg_blog_default.acr_iceberg_report.snapshots s \
ON h.snapshot_id = s.snapshot_id \
ORDER BY made_current_at DESC

### Roll back the `acr_iceberg` and `acr_iceberg_report` tables
Based on snapshot IDs in `acr_iceberg` and `acr_iceberg_report` tables, we roll back each table to a specific table version. To run [rollback queries](https://iceberg.apache.org/docs/latest/spark-procedures/) for two tables, run the following cells.

For the `acr_iceberg` table , run the following cell and revert the table to the snapshot_id with the younger `append` in the operation column (this state is when inserting records happened).

In [None]:
%%sql
CALL glue_catalog.system.rollback_to_snapshot(\'iceberg_blog_default.acr_iceberg\', <Type snapshot_id in acr_iceberg table>)

For the `acr_iceberg_report`table, run the following cell and revert the table to the oldest snapshot_id (this is the initial state of this report table)

In [None]:
%%sql
CALL glue_catalog.system.rollback_to_snapshot(\'iceberg_blog_default.acr_iceberg_report\', <Type snapshot_id in acr_iceberg table>)

### Reflect changes in `acr_iceberg` into `acr_iceberg_report` again
After the reversion of the `acr_iceberg` and `acr_iceberg_repor` tables you can combine them to reflect changes in the `acr_iceberg` table into `acr_iceberg_report` table by running the following cell.

In [None]:
%%sql
MERGE INTO glue_catalog.iceberg_blog_default.acr_iceberg_report as t1 \
USING ( \
    SELECT \
        count(*) as comment_count, \
        avg(star_rating) as avg_star, \
        product_category \
    FROM glue_catalog.iceberg_blog_default.acr_iceberg \
    GROUP BY product_category \
) AS t2 \
ON t2.product_category = t1.product_category \
WHEN \
    MATCHED THEN UPDATE SET t1.comment_count = t1.comment_count + t2.comment_count, t1.avg_star = (t1.avg_star + t2.avg_star)/2 \
WHEN \
    NOT MATCHED THEN INSERT *

Then, check the updated report table by running the following cell. You can see the updated `avg_star` value in `Industrial_Supplies` as follows:
* (the previous `avg_star` shown in **step 4**) `4.50083770245171` 
* (the updated `avg_star`) `4.30083770245171`

In [None]:
%%sql
SELECT * FROM glue_catalog.iceberg_blog_default.acr_iceberg_report \
ORDER BY product_category, avg_star ASC