# Updating a database with deltas using iceberg and athena

In this tutorial we are going to demonstrate how to make a database based on deltas recieved from an external source. We will build a database containing a table of all the raw deltas and then create a second database that shows us the state of the raw table delta at a particular date.

We are going to pretend that we recieve a `csv` file that contains changes of a table. We are going to concatenate those deltas into a single table. Then generate a subsequent table based on the "raw" deltas.

In [30]:
%load_ext dotenv
%dotenv

The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


In [31]:
import os
import pandas as pd
import awswrangler as wr
import datetime
import pydbtools as pydb
from scripts.create_dummy_deltas import get_dummy_deltas

## Setup first

In [32]:
# setup your own testing area (set foldername = GH username)
foldername = "soumaya" # GH username
foldername = foldername.lower().replace("-","_")

In [33]:
region = "eu-west-1"
bucketname = "sb-test-bucket-ireland"
db_name = f"aws_example_{foldername}"
db_base_path = f"s3://{bucketname}/{foldername}/database"
s3_base_path = f"s3://{bucketname}/{foldername}/"

# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
    print("deleting objs")
    wr.s3.delete_objects(s3_base_path)

# Delete the database if it exists
df_dbs = wr.catalog.databases(limit=1000)
if db_name in df_dbs["Database"].to_list():
    print(f"deleting database {db_name}")
    wr.catalog.delete_database(
        name=db_name
    )

In [34]:
# Update the query dump location if necessary 
pydb.utils.bucket = "mojap-athena-query-dump-sandbox"

### Get the deltas

We are going to create deltas from the `"data/employees.csv` table. I am using code in a script in this repo `scripts/create_dummy_deltas.py`. It isn't important what it is doing for this tutorial but if you wanna see what it does you can.

In [35]:
deltas = get_dummy_deltas("data/employees.csv")

**Day 1:** the first extract of deltas from our databases

In [36]:
deltas["day1"]

Unnamed: 0,employee_id,sex,forename,surname,department_id,manager_id,record_deleted
0,1,M,Dexter,Mitchell,1.0,17,False
1,2,F,Summer,Bennett,1.0,17,False
2,3,M,Pip,Carter,1.0,17,False
3,4,F,Bella,Long,1.0,17,False
4,5,F,Lexie,Perry,,17,False


**Day 2:** The next days deltas show that Lexie has their `department_id` and `manager_id` corrected. As well 2 new employees.

In [37]:
deltas["day2"]

Unnamed: 0,employee_id,sex,forename,surname,department_id,manager_id,record_deleted
0,5,F,Lexie,Perry,2,18,False
1,6,M,Robert,Roberts,1,17,False
2,7,F,Iris,Alexander,1,17,False


**Day 3:** The next days deltas show that:
- Dexter has left the department
- Robert and Iris have moved departments and are working for Lexie
- 3 New employees are also now working for Lexie


In [38]:
deltas["day3"]

Unnamed: 0,employee_id,sex,forename,surname,department_id,manager_id,record_deleted
0,1,M,Dexter,Mitchell,1,17,True
1,7,F,Iris,Alexander,2,5,False
2,9,M,Evan,Carter,2,5,False
3,10,F,Lauren,Powell,2,5,False
4,11,F,Alice,James,2,5,False


### Create a database and tables

There are many ways you can create a database and tables (see other tutorials). For this example we will use awswrangler (which infers the table schema from the data).


In [39]:
# Init database and delta table
wr.catalog.create_database(name=db_name)

# Add some parameters that will be useful to manage our deltas
df = deltas["day1"]
df["date_received"] = datetime.date(2021,1,1)

# We are going to name the folder the same as our table
# this makes things less complex and is adviced
table_name = "raw_deltas"
raw_delta_path = os.path.join(
    db_base_path,
    table_name
)
_ = wr.s3.to_parquet(
    df,
    path=raw_delta_path,
    dataset=True,
    database=db_name,
    table=table_name,
    mode="append"
)

In [40]:
sql = f"SELECT * FROM {db_name}.{table_name}"
print(sql)
pydb.read_sql_query(sql, ctas_approach=False)

SELECT * FROM aws_example_soumaya.raw_deltas


Unnamed: 0,employee_id,sex,forename,surname,department_id,manager_id,record_deleted,date_received
0,1,M,Dexter,Mitchell,1.0,17,False,2021-01-01
1,2,F,Summer,Bennett,1.0,17,False,2021-01-01
2,3,M,Pip,Carter,1.0,17,False,2021-01-01
3,4,F,Bella,Long,1.0,17,False,2021-01-01
4,5,F,Lexie,Perry,,17,False,2021-01-01


### Take stock

We now have a database that we created once and we initialised our `raw_deltas` table in our database.

Now we are going to create an iceberg table using Athena. This table will show what our raw_deltas will look like at each day we do an update.

> We are also going to wrap these code chunks into functions. This will help us utilise these functions later to show how you can run a delta update and then the downstream tables

### Athena iceberg derived table

To start off we need to create an empty iceberg table which is registered with the AWS Glue catalog. We'll do this by sending a `CREATE TABLE` query to Athena.

In [41]:
def create_empty_iceberg_table(table_name: str):
    table_path = os.path.join(
        db_base_path,
        table_name,
    )

    create_table_sql = f"""
    CREATE TABLE {db_name}.employee_athena_iceberg(
    employee_id int,
    sex string,
    forename string,
    surname string,
    department_id int,
    manager_id int,
    record_created date,
    record_last_updated date)
    LOCATION '{table_path}/'
    TBLPROPERTIES (
        'table_type'='ICEBERG',
        'format'='parquet'
    )
    """

    try:
        _ = pydb.start_query_execution_and_wait(create_table_sql)
    except Exception as e:
        if not "Iceberg table to be created already exists" in str(e):
            raise
        else:
            print("Iceberg table to be created already exists")


Now let's create the table.

In [42]:
iceberg_table_name="employee_athena_iceberg"
create_empty_iceberg_table("employee_athena_iceberg")

Let's query our empty table to see that it's been created correctly.

In [43]:
sql = f"SELECT * FROM {db_name}.{iceberg_table_name}"
print(sql)
pydb.read_sql_query(sql, ctas_approach=False)

SELECT * FROM aws_example_soumaya.employee_athena_iceberg


Unnamed: 0,employee_id,sex,forename,surname,department_id,manager_id,record_created,record_last_updated


We're now going to create a function which will use our `raw_deltas` table to:
* insert new records
* delete records which are marked as deleted in the `record_deleted` column
* update the manager_id and department_id fields if either of these have changed

We'll do all of this with the `MERGE INTO` SQL command for iceberg tables in Athena.

In [44]:
def create_report_athena_iceberg(report_date: str, table_name: str):
    full_sql = f"""
    MERGE INTO {db_name}.{table_name} t USING (
        SELECT 
            employee_id,
            sex,
            forename,
            surname,
            department_id,
            manager_id,
            date '{report_date}' AS record_last_updated,
            record_deleted
        FROM
        (
            SELECT *,
            row_number() OVER (PARTITION BY employee_id ORDER BY date_received DESC) as rn
            FROM {db_name}.raw_deltas
            WHERE date_received <= date '{report_date}'
        )
        WHERE rn = 1
    ) s ON (t.employee_id = s.employee_id)
        WHEN MATCHED AND s.record_deleted
            THEN DELETE
        WHEN MATCHED AND NOT s.record_deleted AND (t.department_id != s.department_id OR t.manager_id != s.manager_id)
            THEN UPDATE
                SET department_id = s.department_id, manager_id = s.manager_id, record_last_updated = s.record_last_updated
        WHEN NOT MATCHED
            THEN INSERT
                (employee_id, sex, forename, surname, department_id, manager_id, record_created, record_last_updated)
                    VALUES (s.employee_id, s.sex, s.forename, s.surname, s.department_id, s.manager_id, s.record_last_updated, s.record_last_updated)
    """

    # run the query
    pydb.start_query_execution_and_wait(full_sql)

In [45]:
# Run code to create report for 2021-01-01 data
create_report_athena_iceberg("2021-01-01", iceberg_table_name)

In [46]:
sql = f"SELECT * FROM {db_name}.{iceberg_table_name}"
print(sql)
pydb.read_sql_query(sql, ctas_approach=False).sort_values(by=["employee_id"])

SELECT * FROM aws_example_soumaya.employee_athena_iceberg


Unnamed: 0,employee_id,sex,forename,surname,department_id,manager_id,record_created,record_last_updated
0,1,M,Dexter,Mitchell,1.0,17,2021-01-01,2021-01-01
1,2,F,Summer,Bennett,1.0,17,2021-01-01,2021-01-01
4,3,M,Pip,Carter,1.0,17,2021-01-01,2021-01-01
3,4,F,Bella,Long,1.0,17,2021-01-01,2021-01-01
2,5,F,Lexie,Perry,,17,2021-01-01,2021-01-01


### Final bit

Now we have 2 tables.

- `raw_deltas` a table of all the raw data concatenated
- `employee_athena_iceberg` a report based on what employees table looked like at the given point in time. (Remember in this example the raw_deltas are from an external table employees where we get given daily deltas of changes).

Now we want to update each of these tables based on the data from day2 then do it again for day3s data. Lets do that now (starting with day 2)

### Day2

Add day2 data to the deltas table

In [47]:
df = deltas["day2"]
df["date_received"] = datetime.date(2021,1,2)

_ = wr.s3.to_parquet(
    df,
    path=raw_delta_path,
    dataset=True,
    database=db_name,
    table=table_name,
    mode="append"
)

The run the reports for the same date (now the deltas table has been updated)

In [48]:
create_report_athena_iceberg("2021-01-02", iceberg_table_name) # note we use insert to now

In [49]:
sql = f"""
SELECT *
FROM {db_name}.{iceberg_table_name}
"""
print(sql)
pydb.read_sql_query(sql, ctas_approach=False).sort_values(by=["employee_id"])


SELECT *
FROM aws_example_soumaya.employee_athena_iceberg



Unnamed: 0,employee_id,sex,forename,surname,department_id,manager_id,record_created,record_last_updated
0,1,M,Dexter,Mitchell,1,17,2021-01-01,2021-01-01
1,2,F,Summer,Bennett,1,17,2021-01-01,2021-01-01
2,3,M,Pip,Carter,1,17,2021-01-01,2021-01-01
6,4,F,Bella,Long,1,17,2021-01-01,2021-01-01
5,5,F,Lexie,Perry,2,18,2021-01-01,2021-01-02
3,6,M,Robert,Roberts,1,17,2021-01-02,2021-01-02
4,7,F,Iris,Alexander,1,17,2021-01-02,2021-01-02


As we can see new employyes have been added and Lexie's department and manager records have been updated as expected.

It is also worth noting that previous reports have been untouched (using the pandas table as an example)

### Day 3

Lets run the same again for day 3. The code is exactly the same as it was for day2 but now with a new date

In [50]:
# update raw deltas first
df = deltas["day3"]
df["date_received"] = datetime.date(2021,1,3)

_ = wr.s3.to_parquet(
    df,
    path=raw_delta_path,
    dataset=True,
    database=db_name,
    table=table_name,
    mode="append"
)

# Then run reports
create_report_athena_iceberg("2021-01-03", iceberg_table_name)

In [51]:
sql = f"""
SELECT *
FROM {db_name}.{iceberg_table_name}
"""
print(sql)
pydb.read_sql_query(sql, ctas_approach=False).sort_values(by=["employee_id"])


SELECT *
FROM aws_example_soumaya.employee_athena_iceberg



Unnamed: 0,employee_id,sex,forename,surname,department_id,manager_id,record_created,record_last_updated
5,2,F,Summer,Bennett,1,17,2021-01-01,2021-01-01
4,3,M,Pip,Carter,1,17,2021-01-01,2021-01-01
6,4,F,Bella,Long,1,17,2021-01-01,2021-01-01
8,5,F,Lexie,Perry,2,18,2021-01-01,2021-01-02
7,6,M,Robert,Roberts,1,17,2021-01-02,2021-01-02
3,7,F,Iris,Alexander,2,5,2021-01-02,2021-01-03
2,9,M,Evan,Carter,2,5,2021-01-03,2021-01-03
0,10,F,Lauren,Powell,2,5,2021-01-03,2021-01-03
1,11,F,Alice,James,2,5,2021-01-03,2021-01-03


From the above we can see that Dexter has been removed from the report (as he left) and new staff have been added. Again as expected when looking at our original deltas.

# Performing Time Travel

As we're using iceberg we can perform time travel on our table to view the state of the table at a given point of time. To get a list of when the table was updated so we now when to travel to, we can query the iceberg table's history as follows:

In [23]:
history = pydb.read_sql_query(f'SELECT * FROM "{db_name}"."{iceberg_table_name}$history"', ctas_approach=False)
history

Unnamed: 0,made_current_at,snapshot_id,parent_id,is_current_ancestor
0,2023-06-08 16:25:30.021000+00:00,8053548908685702207,,True
1,2023-06-08 16:26:04.348000+00:00,2982069975439286703,1.9048735725918449e+18,True
2,2023-06-08 16:26:20.364000+00:00,5677713241027579376,4.07795625627746e+17,True


Let's grab a time between our latest and penultimate change and query it.

In [25]:
timestamp = (history.made_current_at[1] + pd.to_timedelta(1, unit='s')).strftime("%Y-%m-%d %H:%M:%S")
sql = f"SELECT * FROM {db_name}.{iceberg_table_name} FOR TIMESTAMP AS OF TIMESTAMP '{timestamp} UTC'"
pydb.read_sql_query(sql, ctas_approach=False)

Unnamed: 0,employee_id,sex,forename,surname,department_id,manager_id,record_created,record_last_updated
0,2,F,Summer,Bennett,1,17,2021-01-01,2021-01-01
1,3,M,Pip,Carter,1,17,2021-01-01,2021-01-01
2,6,M,Robert,Roberts,1,17,2021-01-02,2021-01-02
3,7,F,Iris,Alexander,1,17,2021-01-02,2021-01-02
4,5,F,Lexie,Perry,2,18,2021-01-01,2021-01-02
5,1,M,Dexter,Mitchell,1,17,2021-01-01,2021-01-01
6,4,F,Bella,Long,1,17,2021-01-01,2021-01-01


We can also optimise how our iceberg table is stored by running an `OPTIMIZE` command

In [26]:
pydb.start_query_execution_and_wait(f"OPTIMIZE {db_name}.{iceberg_table_name} REWRITE DATA USING BIN_PACK")

{'QueryExecutionId': '4111250b-e071-4476-9aea-665231a9884e',
 'Query': 'OPTIMIZE aws_example_soumaya.employee_athena_iceberg REWRITE DATA USING BIN_PACK',
 'StatementType': 'DDL',
 'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-684969100054-eu-west-1/tables/4111250b-e071-4476-9aea-665231a9884e'},
 'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
 'QueryExecutionContext': {},
 'Status': {'State': 'SUCCEEDED',
  'SubmissionDateTime': datetime.datetime(2023, 6, 8, 17, 27, 40, 112000, tzinfo=tzlocal()),
  'CompletionDateTime': datetime.datetime(2023, 6, 8, 17, 27, 42, 992000, tzinfo=tzlocal())},
 'Statistics': {'EngineExecutionTimeInMillis': 2613,
  'DataScannedInBytes': 2089,
  'DataManifestLocation': 's3://aws-athena-query-results-684969100054-eu-west-1/tables/4111250b-e071-4476-9aea-665231a9884e-manifest.csv',
  'TotalExecutionTimeInMillis': 2880,
  'QueryQueueTimeInMillis': 240,
  'QueryPlanningTimeInMillis': 430,
  'ServicePr

Note that this does create a new version of our table (see below)

In [27]:
history = pydb.read_sql_query(f'SELECT * FROM "{db_name}"."{iceberg_table_name}$history"', ctas_approach=False)
history

Unnamed: 0,made_current_at,snapshot_id,parent_id,is_current_ancestor
0,2023-06-08 16:25:30.021000+00:00,8053548908685702207,,True
1,2023-06-08 16:26:04.348000+00:00,2982069975439286703,1.9048735725918449e+18,True
2,2023-06-08 16:26:20.364000+00:00,5677713241027579376,4.07795625627746e+17,True
3,2023-06-08 16:27:42.503000+00:00,2354635518523590439,5.677713241027579e+18,True


### Wrapping Up

So hopefully that is useful. Let's destroy what we created.

In [52]:
### Clean up

# Delete all the s3 files in a given path
if wr.s3.list_objects(s3_base_path):
    print("deleting objs")
    wr.s3.delete_objects(s3_base_path)

# Delete the database if it exists
df_dbs = wr.catalog.databases(limit=1000)
if db_name in df_dbs["Database"].to_list():
    print("Deleting database")
    wr.catalog.delete_database(
        name=db_name
    )

deleting objs
Deleting database
