![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)

### [Docker, Spark, and Iceberg: The Fastest Way to Try Iceberg!](https://tabular.io/blog/docker-spark-and-iceberg/)

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

spark

24/06/29 11:44:27 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Load One Month of NYC Taxi/Limousine Trip Data

For this notebook, we will use the New York City Taxi and Limousine Commision Trip Record Data that's available on the AWS Open Data Registry. This contains data of trips taken by taxis and for-hire vehicles in New York City. We'll save this into an iceberg table called `taxis`.

To be able to rerun the notebook several times, let's drop the table if it exists to start fresh.

In [2]:
%%sql

CREATE DATABASE IF NOT EXISTS nyc

In [3]:
%%sql

DROP TABLE IF EXISTS nyc.taxis

In [4]:
df = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2021-04.parquet")
df.write.saveAsTable("nyc.taxis")

                                                                                

In [5]:
%%sql

DESCRIBE EXTENDED nyc.taxis

col_name,data_type,comment
VendorID,bigint,
tpep_pickup_datetime,timestamp_ntz,
tpep_dropoff_datetime,timestamp_ntz,
passenger_count,double,
trip_distance,double,
RatecodeID,double,
store_and_fwd_flag,string,
PULocationID,bigint,
DOLocationID,bigint,
payment_type,bigint,


In [7]:
%%sql

SELECT COUNT(*) as cnt
FROM nyc.taxis

cnt
2171187


In [9]:
%pip install faker

Collecting faker
  Downloading Faker-26.0.0-py3-none-any.whl (1.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m0m
Installing collected packages: faker
Successfully installed faker-26.0.0
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m24.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [10]:
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import col, lit, current_timestamp, expr
from datetime import datetime as dt
from faker import Faker
from uuid import uuid1

fake = Faker()
GLOBAL_TEST_PATH = 'tests/test_data'
DATA_PATHS = {
    'DMS': {
        'SOURCE': f'{GLOBAL_TEST_PATH}/db_data',
        'SINK': f'{GLOBAL_TEST_PATH}/dms_sink'
    }
}
TABLES = {
    1: {
        'name': 'demo',
    }
}


In [30]:
def create_mock_dimension_table_dataframe(spark, row_count=10):
    """ Create initial test data for dimension table
    This function creates both pre- and post- transformation data
    saved as Parquet files in tests/test_data. This will be used for
    unit tests as well as to load as a part of example ingestion job
    :return: None
    """
    ts = dt.now()
    df = spark.range(row_count)
    local_records = [
        Row(
            name=fake.name(),
            address=fake.address(),
            license_number=fake.license_plate(),
            iban=fake.iban(),
            bs=fake.bs(),
            catch_phrase=fake.catch_phrase(),
            company=fake.company(),
            paragraph=fake.paragraph(nb_sentences=10),
            created_at=fake.date_time(),
            updated_at=ts
        )
        for i in range(row_count)
    ]
    df = spark.createDataFrame(local_records)
    df = df.withColumn("uuid", expr("uuid()"))

    print(f"""
        dataframe of count {row_count} 
        was created from faker as {ts}
    """)
    return df


In [31]:
def get_location(actor, io, table_name, action):
    """ Get Location
    <actors>/<table_name>/<action>/<timestamp>

    :params actor: executor that called the function
    :params io: intended SOURCE or SINK
    :params table_name: name of that table that the location
        is associated to
    :params action: key event that caused the data creation
    :return: Location
    """
    ts_str = dt.now().strftime("%Y_%m_%d-%I:%M:%S_%p")
    loc = f'{DATA_PATHS[actor][io]}/{table_name}/{action}/{ts_str}'
    print(f'''
        Location: {loc}
            for {actor}
            on {io} 
            for {table_name} 
            on {action}
            at {ts_str}
    ''')
    return loc

def extract_parquet(spark, loc):
    """Load data from Parquet file format.

    :param spark: Spark session object.
    :param loc: Location of data
    :return: Spark DataFrame.
    """
    df = (
        spark
        .read
        .parquet(loc))
    print(f"""
        dataframe of count {df.count()} '
        was read from location {loc}'
    """)
    return df


def load_parquet(df, loc, num_of_output_files=1):
    """Load data to Parquet file format.

    :param df: Spark Dataframe
    :param loc: Location of data
    :param num_of_output_files: number of files
    :return: None
    """

    (df
     .coalesce(num_of_output_files)
     .write
     .parquet(loc, mode='overwrite')
     )

    print(f'''
        Successfully written {num_of_output_files} file
        for on location {loc}
    ''')


def load_table(df, database_name, table_name):
    """ Write a table in associated catalog

    :params df: Dataframe to write
    :params database_name: namespace as in catalog
    :params table_name: table's name for reference
    :return: None
    """
    df.write.saveAsTable(f"{database_name}.{table_name}")
    print(f' Successfully added table {database_name}.{table_name} to catalog ')

def load_mock_db_initial_state(spark, df, table_name):
    """ Write mock initial mock data for dimension table
    This function creates mock initial load data that DMS would have
    generated for a dimensional table that stores names of all entities
    saved as Parquet files in tests/test_data. This will be used for
    unit tests as well as to load as a part of example ingestion job.
    :return: Location
    """

    location = get_location(
        actor='DMS',
        io='SOURCE',
        table_name=table_name,
        action='initial_state'
    )
    load_parquet(
        df=df,
        loc=location
    )

    print(f'''
        Successfully written Mock DB initial state
        for table {table_name} on location {location}
    ''')
    return location


In [32]:
def load_mock_dms_full_load(table_name, df):
    """ Mock AWS DMS full load
    :param table_name: Source Table Name
    :param df: Source Table Data
    """
    df_initial_load = (
        df
        .withColumn(
            "Op",
            lit('I')
        )
        .withColumn(
            'dms_export_timestamp',
            current_timestamp()
        )
    )
    location = get_location(
        actor='DMS',
        io='SINK',
        table_name=table_name,
        action='full_load'
    )
    load_parquet(
        df=df_initial_load,
        loc=location
    )
    print(f'''
        Successfully written Mock DMS full initial load
        for table {table_name} on location {location}
    ''')

    return location


In [45]:
def load_mock_dms_cdc_load(table_name, df):
    """ Mock AWS DMS full load
    :param table_name: Source Table Name
    :param df: Source Table Data
    """
    df_initial_load = (
        df
        .withColumn(
            "Op",
            lit('U')
        )
        .withColumn(
            'dms_export_timestamp',
            current_timestamp()
        )
    )
    location = get_location(
        actor='DMS',
        io='SINK',
        table_name=table_name,
        action='cdc_load'
    )
    load_parquet(
        df=df_initial_load,
        loc=location
    )
    print(f'''
        Successfully written Mock DMS full initial load
        for table {table_name} on location {location}
    ''')

    return location


In [33]:
df_dimension_table = create_mock_dimension_table_dataframe(
    spark,
    1000
)



        dataframe of count 1000 
        was created from faker as 2024-06-29 13:35:54.350273
    


In [34]:
df_dimension_table.show(1, False, True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 name           | Michael Hunt                                                                                                                                                                                                                                                                                                                                                    
 address        | 4258 Salazar Shoal\nWest Adamchester, IN 52592                                                                                                                                                                                                  

In [35]:
loc_db_initial_state = load_mock_db_initial_state(
    spark,
    df_dimension_table,
    TABLES[1]['name']
)



        Location: tests/test_data/db_data/demo/initial_state/2024_06_29-01:35:56_PM
            for DMS
            on SOURCE 
            for demo 
            on initial_state
            at 2024_06_29-01:35:56_PM
    

        Successfully written 1 file
        for on location tests/test_data/db_data/demo/initial_state/2024_06_29-01:35:56_PM
    

        Successfully written Mock DB initial state
        for table demo on location tests/test_data/db_data/demo/initial_state/2024_06_29-01:35:56_PM
    


In [36]:
df_mock_db_initial_state = extract_parquet(spark, loc_db_initial_state)



        dataframe of count 1000 '
        was read from location tests/test_data/db_data/demo/initial_state/2024_06_29-01:35:56_PM'
    


In [37]:
loc_mock_dms_data = load_mock_dms_full_load(
    TABLES[1]['name'],
    df_mock_db_initial_state
)


        Location: tests/test_data/dms_sink/demo/full_load/2024_06_29-01:35:57_PM
            for DMS
            on SINK 
            for demo 
            on full_load
            at 2024_06_29-01:35:57_PM
    

        Successfully written 1 file
        for on location tests/test_data/dms_sink/demo/full_load/2024_06_29-01:35:57_PM
    

        Successfully written Mock DMS full initial load
        for table demo on location tests/test_data/dms_sink/demo/full_load/2024_06_29-01:35:57_PM
    


In [46]:
loc_mock_dms_data = load_mock_dms_cdc_load(
    TABLES[1]['name'],
    df_mock_db_initial_state
)


        Location: tests/test_data/dms_sink/demo/cdc_load/2024_06_29-01:47:34_PM
            for DMS
            on SINK 
            for demo 
            on cdc_load
            at 2024_06_29-01:47:34_PM
    

        Successfully written 1 file
        for on location tests/test_data/dms_sink/demo/cdc_load/2024_06_29-01:47:34_PM
    

        Successfully written Mock DMS full initial load
        for table demo on location tests/test_data/dms_sink/demo/cdc_load/2024_06_29-01:47:34_PM
    


In [47]:
df_mock_dms_data = extract_parquet(spark, loc_mock_dms_data)



        dataframe of count 1000 '
        was read from location tests/test_data/dms_sink/demo/cdc_load/2024_06_29-01:47:34_PM'
    


In [39]:
%%sql

DROP TABLE IF EXISTS nyc.demo_single_big_file

In [49]:
df_mock_dms_data.writeTo("nyc.demo_single_big_file").append()



In [50]:
%%sql

select count(*) from nyc.demo_single_big_file 

count(1)
2000


In [53]:
%%sql
WITH windowed_changes AS (
    SELECT
        op,
        uuid,
        updated_at,
        row_number() OVER (
            PARTITION BY uuid
            ORDER BY updated_at DESC) AS row_num
    FROM nyc.demo_single_big_file
),
mirror as (
    SELECT uuid, updated_at, op
    FROM windowed_changes WHERE row_num = 1 AND op != 'D'
)
select count(*) from mirror where op='U'

count(1)
1000


## Schema Evolution

Adding, dropping, renaming, or altering columns is easy and safe in Iceberg. In this example, we'll rename `fare_amount` to `fare` and `trip_distance` to `distance`. We'll also add a float column `fare_per_distance_unit` immediately after `distance`.

In [None]:
%%sql

ALTER TABLE nyc.taxis RENAME COLUMN fare_amount TO fare

In [None]:
%%sql

ALTER TABLE nyc.taxis RENAME COLUMN trip_distance TO distance

In [None]:
%%sql

ALTER TABLE nyc.taxis ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'

In [None]:
%%sql

ALTER TABLE nyc.taxis ALTER COLUMN distance TYPE double;

In [None]:
%%sql

ALTER TABLE nyc.taxis ALTER COLUMN distance AFTER fare;

In [None]:
%%sql

ALTER TABLE nyc.taxis
ADD COLUMN fare_per_distance_unit float AFTER distance

Let's update the new `fare_per_distance_unit` to equal `fare` divided by `distance`.

In [None]:
%%sql

UPDATE nyc.taxis
SET fare_per_distance_unit = fare/distance

In [None]:
%%sql

SELECT
VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance_unit
FROM nyc.taxis

## Expressive SQL for Row Level Changes
With Iceberg tables, `DELETE` queries can be used to perform row-level deletes. This is as simple as providing the table name and a `WHERE` predicate. If the filter matches an entire partition of the table, Iceberg will intelligently perform a metadata-only operation where it simply deletes the metadata for that partition.

Let's perform a row-level delete for all rows that have a `fare_per_distance_unit` greater than 4 or a `distance` greater than 2. This should leave us with relatively short trips that have a relatively high fare per distance traveled.

In [None]:
%%sql

DELETE FROM nyc.taxis
WHERE fare_per_distance_unit > 4.0 OR distance > 2.0

There are some fares that have a `null` for `fare_per_distance_unit` due to the distance being `0`. Let's remove those as well.

In [None]:
%%sql

DELETE FROM nyc.taxis
WHERE fare_per_distance_unit is null

In [None]:
%%sql

SELECT
VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance_unit
FROM nyc.taxis

In [None]:
%%sql

SELECT COUNT(*) as cnt
FROM nyc.taxis

## Partitioning

A table’s partitioning can be updated in place and applied only to newly written data. Query plans are then split, using the old partition scheme for data written before the partition scheme was changed, and using the new partition scheme for data written after. People querying the table don’t even have to be aware of this split. Simple predicates in WHERE clauses are automatically converted to partition filters that prune out files with no matches. This is what’s referred to in Iceberg as *Hidden Partitioning*.

In [None]:
%%sql

ALTER TABLE nyc.taxis
ADD PARTITION FIELD VendorID

## Metadata Tables

Iceberg tables contain very rich metadata that can be easily queried. For example, you can retrieve the manifest list for any snapshot, simply by querying the table's `snapshots` table.

In [None]:
%%sql

SELECT snapshot_id, manifest_list
FROM nyc.taxis.snapshots

The `files` table contains loads of information on data files, including column level statistics such as null counts, lower bounds, and upper bounds.

In [None]:
%%sql

SELECT file_path, file_format, record_count, null_value_counts, lower_bounds, upper_bounds
FROM nyc.taxis.files

## Time Travel

The history table lists all snapshots and which parent snapshot they derive from. The `is_current_ancestor` flag let's you know if a snapshot is part of the linear history of the current snapshot of the table.

In [None]:
%%sql

SELECT *
FROM nyc.taxis.history

You can time-travel by altering the `current-snapshot-id` property of the table to reference any snapshot in the table's history. Let's revert the table to it's original state by traveling to the very first snapshot ID.

In [None]:
%%sql --var df

SELECT *
FROM nyc.taxis.history

In [None]:
original_snapshot = df.head().snapshot_id
spark.sql(f"CALL system.rollback_to_snapshot('nyc.taxis', {original_snapshot})")
original_snapshot

In [None]:
%%sql

SELECT
VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance_unit
FROM nyc.taxis

Another look at the history table shows that the original state of the table has been added as a new entry
with the original snapshot ID.

In [None]:
%%sql

SELECT *
FROM nyc.taxis.history

In [None]:
%%sql

SELECT COUNT(*) as cnt
FROM nyc.taxis