# Trino / Iceberg connector data management demo sample
Copyright (C) 2021 OS-Climate

This sample shows:
* How to create schemas and tables via the Trino / SQLAlchemy on an underlying Iceberg data volume
* Apache Iceberg ACID transaction and time travel capabilities used for data set versioning


Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

In [1]:
%%capture pipoutput
%pip install boto3 python-dotenv
%pip install --upgrade sqlalchemy==1.3 sqlalchemy-trino
%pip install pandas pyarrow fastparquet
%pip install anytree
%pip install osc-ingest-tools

In [2]:
from dotenv import dotenv_values, load_dotenv
from osc_ingest_trino import *
import os
import pathlib
import pandas as pd
import uuid
import trino
from sqlalchemy.engine import create_engine

Load Environment Variables

In [3]:
dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src'))
dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env'
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path,override=True)

Create a simple data frame for testing

In [4]:
# initialise data of lists of powerplants and capacity
data = {'name':['Albas', 'Albi', 'Albias', 'Allaire'],
        'capacity_gwh':[1.8, 6.70448, 2.41, 8.2]}
# Create DataFrame
df = pd.DataFrame(data)
# Add a unique identifier to the data set
uid = str(uuid.uuid4())
df['uuid'] = uid
# Print the output
df = df.convert_dtypes()
df

Unnamed: 0,name,capacity_gwh,uuid
0,Albas,1.8,32a98e7e-0f8e-4c76-86bb-7e598928d859
1,Albi,6.70448,32a98e7e-0f8e-4c76-86bb-7e598928d859
2,Albias,2.41,32a98e7e-0f8e-4c76-86bb-7e598928d859
3,Allaire,8.2,32a98e7e-0f8e-4c76-86bb-7e598928d859


In [5]:
df.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 3 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   name          4 non-null      string 
 1   capacity_gwh  4 non-null      Float64
 2   uuid          4 non-null      string 
dtypes: Float64(1), string(2)
memory usage: 228.0 bytes


Create custom meta data and declare variable for schema and table for the data set

In [6]:
custom_meta_content = {
    'dataset_key': 'gppd',
    'title': 'Global Power Plant Database',
    'description': 'A comprehensive, global, open source database of power plants',
    'version': '1.3.0',
    'release_date': '20210602',
    'fields': [
    {
        'field_name': 'name',
        'definition': 'name of the power plant',
        'type': 'text'
    },
    {
        'field_name': 'capacity_gwh',
        'definition': 'electricity generation in gigawatt-hours',
        'type': 'number'
    }]
}
schemaname = 'wri_test'
tablename = 'gppd_new'

Convert custom metadata content in json format into Pandas DataFrame

In [7]:
df_meta_fields = pd.json_normalize(custom_meta_content, record_path =['fields'], meta=['dataset_key']).convert_dtypes()
df_meta_fields

Unnamed: 0,field_name,definition,type,dataset_key
0,name,name of the power plant,text,gppd
1,capacity_gwh,electricity generation in gigawatt-hours,number,gppd


In [8]:
df_meta_fields.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2 entries, 0 to 1
Data columns (total 4 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   field_name   2 non-null      string
 1   definition   2 non-null      string
 2   type         2 non-null      string
 3   dataset_key  2 non-null      string
dtypes: string(4)
memory usage: 192.0 bytes


In [9]:
df_meta_table = pd.json_normalize(custom_meta_content, max_level=0)
df_meta_table.drop('fields', inplace=True, axis=1)
df_meta_table['schema'] = schemaname
df_meta_table = df_meta_table.convert_dtypes()
df_meta_table

Unnamed: 0,dataset_key,title,description,version,release_date,schema
0,gppd,Global Power Plant Database,"A comprehensive, global, open source database ...",1.3.0,20210602,wri_test


In [10]:
df_meta_table.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 6 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   dataset_key   1 non-null      string
 1   title         1 non-null      string
 2   description   1 non-null      string
 3   version       1 non-null      string
 4   release_date  1 non-null      string
 5   schema        1 non-null      string
dtypes: string(6)
memory usage: 176.0 bytes


Open a Trino connection using JWT for authentication

In [11]:
sqlstring = 'trino://{user}@{host}:{port}/'.format(
    user = os.environ['TRINO_USER'],
    host = os.environ['TRINO_HOST'],
    port = os.environ['TRINO_PORT']
)
sqlargs = {
    'auth': trino.auth.JWTAuthentication(os.environ['TRINO_PASSWD']),
    'http_scheme': 'https'
}
engine = create_engine(sqlstring, connect_args = sqlargs)
print("connecting with engine " + str(engine))
connection = engine.connect()

connecting with engine Engine(trino://caldeirav@trino-secure-odh-trino.apps.odh-cl1.apps.os-climate.org:443/)


In [12]:
# Show available schemas to ensure trino connection is set correctly
schema_read = engine.execute('show schemas in osc_datacommons_iceberg_dev')
for row in schema_read.fetchall():
    print(row)

('aicoe_osc_demo',)
('company_data',)
('default',)
('defaultschema1',)
('demo',)
('eje_test_iceberg',)
('epacems',)
('epacems_y95_al',)
('gleif',)
('information_schema',)
('metastore',)
('metastore_iceberg',)
('osc_corp_data',)
('physical_risk_project',)
('pudl',)
('rmi_utility_transition_hub',)
('team1',)
('team2',)
('testaccessschema1',)
('testdb',)
('urgentem',)
('wri',)
('wri_gppd',)
('wri_gppd_md',)
('wri_test',)


Create ingestion schema based on source data name and remove old tables if necessary

In [13]:
schema_check = engine.execute('create schema if not exists osc_datacommons_iceberg_dev.' + schemaname)
for row in schema_check.fetchall():
    print(row)

(True,)


In [14]:
table_check = engine.execute('drop table if exists osc_datacommons_iceberg_dev.' + schemaname + '.' + tablename)
for row in table_check.fetchall():
    print(row)

(True,)


In [15]:
schema = create_table_schema_pairs(df)
tabledef = """
create table if not exists osc_datacommons_iceberg_dev.{sname}.{tname} (
{schema}
) with (
    format = 'parquet',
    partitioning = ARRAY['uuid']
)
""".format(schema=schema, sname=schemaname, tname=tablename)
print(tabledef)


create table if not exists osc_datacommons_iceberg_dev.wri_test.gppd_new (
    name varchar,
    capacity_gwh double,
    uuid varchar
) with (
    format = 'parquet',
    partitioning = ARRAY['uuid']
)



In [16]:
table_create = engine.execute(tabledef)
for row in table_create.fetchall():
    print(row)

(True,)


In [17]:
# Append data frame to new Trino table 
# this statement should work but receives a TrinoUserError: "This connector does not support creating tables"
# df.to_sql('gppd', con=engine, schema='wri_test', if_exists='append')

In [18]:
list_values = df.values.tolist()
list_values[0]

['Albas', 1.8, '32a98e7e-0f8e-4c76-86bb-7e598928d859']

In [19]:
list_length = len(list_values)
for i in range(list_length):
    joined_values = '(\'' + list_values[i][0] + '\', ' + str(list_values[i][1]) + ', \'' + list_values[i][2] + '\')'
    insert_statement = """INSERT INTO osc_datacommons_iceberg_dev.{sname}.{tname} 
    VALUES """.format(sname=schemaname, tname=tablename) + joined_values
    print(insert_statement)
    run_statement = engine.execute(insert_statement)
    for row in run_statement.fetchall():
        print(row)

INSERT INTO osc_datacommons_iceberg_dev.wri_test.gppd_new 
    VALUES ('Albas', 1.8, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
(1,)
INSERT INTO osc_datacommons_iceberg_dev.wri_test.gppd_new 
    VALUES ('Albi', 6.70448, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
(1,)
INSERT INTO osc_datacommons_iceberg_dev.wri_test.gppd_new 
    VALUES ('Albias', 2.41, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
(1,)
INSERT INTO osc_datacommons_iceberg_dev.wri_test.gppd_new 
    VALUES ('Allaire', 8.2, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
(1,)


In [20]:
dataset_query = ('SELECT * FROM osc_datacommons_iceberg_dev.{sname}.{tname} limit 10').format(sname=schemaname,tname=tablename)
print(dataset_query)
dataset = engine.execute(dataset_query)
for row in dataset.fetchall():
    print(row)

SELECT * FROM osc_datacommons_iceberg_dev.wri_test.gppd_new limit 10
('Albas', 1.8, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
('Albi', 6.70448, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
('Albias', 2.41, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
('Allaire', 8.2, '32a98e7e-0f8e-4c76-86bb-7e598928d859')


Query Iceberg snapshots for WRI GPPD data set. Snapshots allow having an immutable set of the data at a given time. They are automatically created on every append or removal of data.

In [21]:
snapshot_query = ('SELECT committed_at, snapshot_id, parent_id FROM osc_datacommons_iceberg_dev.{sname}.\"{tname}$snapshots\"').format(sname=schemaname,tname=tablename)
print(snapshot_query)
dataset = engine.execute(snapshot_query)
for row in dataset.fetchall():
    print(row)

SELECT committed_at, snapshot_id, parent_id FROM osc_datacommons_iceberg_dev.wri_test."gppd_new$snapshots"
('2021-11-17 15:09:12.459 UTC', 9025854779685413976, None)
('2021-11-17 15:09:21.143 UTC', 4181945915156604382, 9025854779685413976)
('2021-11-17 15:09:22.030 UTC', 302793385755985542, 4181945915156604382)
('2021-11-17 15:09:22.874 UTC', 4237598940379949314, 302793385755985542)
('2021-11-17 15:09:23.879 UTC', 7161852363068757534, 4237598940379949314)


Create metadata table for schema / dataset level information

In [22]:
# declare variable names for metadata structure in Trino
meta_schema_name = 'metastore_iceberg'
meta_table_name_dataset = 'meta_tables_iceberg'
meta_table_name_fields = 'meta_fields_iceberg'

In [23]:
schema_check = engine.execute('create schema if not exists osc_datacommons_iceberg_dev.' + meta_schema_name)
for row in schema_check.fetchall():
    print(row)

(True,)


In [24]:
table_check = engine.execute('drop table if exists osc_datacommons_iceberg_dev.' + meta_schema_name + '.' + meta_table_name_dataset)
for row in table_check.fetchall():
    print(row)

(True,)


In [25]:
schema_meta_table = create_table_schema_pairs(df_meta_table)
tabledef = """
create table if not exists osc_datacommons_iceberg_dev.{sname}.{tname} (
{schema}
) with (
    format = 'parquet',
    partitioning = ARRAY['dataset_key']
)
""".format(schema=schema_meta_table, sname=meta_schema_name, tname=meta_table_name_dataset)
print(tabledef)


create table if not exists osc_datacommons_iceberg_dev.metastore_iceberg.meta_tables_iceberg (
    dataset_key varchar,
    title varchar,
    description varchar,
    version varchar,
    release_date varchar,
    schema varchar
) with (
    format = 'parquet',
    partitioning = ARRAY['dataset_key']
)



In [26]:
meta_table_create = engine.execute(tabledef)
for row in meta_table_create.fetchall():
    print(row)

(True,)


In [27]:
list_values_meta_table = df_meta_table.values.tolist()
list_values_meta_table[0]

['gppd',
 'Global Power Plant Database',
 'A comprehensive, global, open source database of power plants',
 '1.3.0',
 '20210602',
 'wri_test']

In [28]:
joined_values = '(\'' + list_values_meta_table[0][0] + '\', \'' + list_values_meta_table[0][1] + '\', \'' + list_values_meta_table[0][2] + '\', \'' + list_values_meta_table[0][3] + '\', \'' + list_values_meta_table[0][4] + '\', \'' + list_values_meta_table[0][5] + '\')'
insert_statement = """INSERT INTO osc_datacommons_iceberg_dev.{sname}.{tname} 
VALUES """.format(sname=meta_schema_name,tname=meta_table_name_dataset) + joined_values
print(insert_statement)
run_statement = engine.execute(insert_statement)
for row in run_statement.fetchall():
    print(row)

INSERT INTO osc_datacommons_iceberg_dev.metastore_iceberg.meta_tables_iceberg 
VALUES ('gppd', 'Global Power Plant Database', 'A comprehensive, global, open source database of power plants', '1.3.0', '20210602', 'wri_test')
(1,)


In [29]:
meta_query_table = ('SELECT * FROM osc_datacommons_iceberg_dev.{sname}.{tname} limit 10').format(sname=meta_schema_name,tname=meta_table_name_dataset)
print(meta_query_table)
meta_table_query = engine.execute(meta_query_table)
for row in meta_table_query.fetchall():
    print(row)

SELECT * FROM osc_datacommons_iceberg_dev.metastore_iceberg.meta_tables_iceberg limit 10
('gppd', 'Global Power Plant Database', 'A comprehensive, global, open source database of power plants', '1.3.0', '20210602', 'wri_test')


Create metadata table for fields information

In [30]:
table_check = engine.execute('drop table if exists osc_datacommons_iceberg_dev.' + meta_schema_name + '.' + meta_table_name_fields)
for row in table_check.fetchall():
    print(row)

(True,)


In [31]:
schema_meta_fields = create_table_schema_pairs(df_meta_fields)
tabledef = """
create table if not exists osc_datacommons_iceberg_dev.{sname}.{tname} (
{schema}
) with (
    format = 'parquet',
    partitioning = ARRAY['dataset_key']
)
""".format(schema=schema_meta_fields, sname=meta_schema_name, tname=meta_table_name_fields)
print(tabledef)


create table if not exists osc_datacommons_iceberg_dev.metastore_iceberg.meta_fields_iceberg (
    field_name varchar,
    definition varchar,
    type varchar,
    dataset_key varchar
) with (
    format = 'parquet',
    partitioning = ARRAY['dataset_key']
)



In [32]:
meta_fields_create = engine.execute(tabledef)
for row in meta_fields_create.fetchall():
    print(row)

(True,)


In [33]:
list_values_meta_fields = df_meta_fields.values.tolist()
list_values_meta_fields[0]

['name', 'name of the power plant', 'text', 'gppd']

In [34]:
list_fields_length = len(list_values_meta_fields)
for i in range(list_fields_length):
    joined_values = '(\'' + list_values_meta_fields[i][0] + '\', \'' + list_values_meta_fields[i][1] + '\', \'' + list_values_meta_fields[i][2] + '\', \'' + list_values_meta_fields[i][3] + '\')'
    insert_statement = """INSERT INTO osc_datacommons_iceberg_dev.{sname}.{tname} 
    VALUES """.format(sname=meta_schema_name, tname=meta_table_name_fields) + joined_values
    print(insert_statement)
    run_statement = engine.execute(insert_statement)
    for row in run_statement.fetchall():
        print(row)

INSERT INTO osc_datacommons_iceberg_dev.metastore_iceberg.meta_fields_iceberg 
    VALUES ('name', 'name of the power plant', 'text', 'gppd')
(1,)
INSERT INTO osc_datacommons_iceberg_dev.metastore_iceberg.meta_fields_iceberg 
    VALUES ('capacity_gwh', 'electricity generation in gigawatt-hours', 'number', 'gppd')
(1,)


In [35]:
meta_query_fields = ('SELECT * FROM osc_datacommons_iceberg_dev.{sname}.{tname} limit 10').format(sname=meta_schema_name, tname=meta_table_name_fields)
print(meta_query_fields)
meta_fields_query = engine.execute(meta_query_fields)
for row in meta_fields_query.fetchall():
    print(row)

SELECT * FROM osc_datacommons_iceberg_dev.metastore_iceberg.meta_fields_iceberg limit 10
('name', 'name of the power plant', 'text', 'gppd')
('capacity_gwh', 'electricity generation in gigawatt-hours', 'number', 'gppd')


Update the source data to create a new data set for ingestion

In [36]:
# initialise new data for powerplants and capacity
new_data = {'name':['Albas', 'Albi', 'Albias', 'Allaire'],
        'capacity_gwh':[2.8, 6.90448, 2.20, 7.4]}
# Create DataFrame
new_df = pd.DataFrame(new_data)
# Add a unique identifier to the data set
uid = str(uuid.uuid4())
new_df['uuid'] = uid
# Print the output
new_df = new_df.convert_dtypes()
new_df

Unnamed: 0,name,capacity_gwh,uuid
0,Albas,2.8,62f65f0d-6dcc-4265-8be1-c181945de9c2
1,Albi,6.90448,62f65f0d-6dcc-4265-8be1-c181945de9c2
2,Albias,2.2,62f65f0d-6dcc-4265-8be1-c181945de9c2
3,Allaire,7.4,62f65f0d-6dcc-4265-8be1-c181945de9c2


In [37]:
new_list_values = new_df.values.tolist()
new_list_values[0]

['Albas', 2.8, '62f65f0d-6dcc-4265-8be1-c181945de9c2']

Ingest new data set

In [38]:
list_length = len(new_list_values)
for i in range(list_length):
    joined_values = '(\'' + new_list_values[i][0] + '\', ' + str(new_list_values[i][1]) + ', \'' + new_list_values[i][2] + '\')'
    insert_statement = """INSERT INTO osc_datacommons_iceberg_dev.{sname}.{tname} 
    VALUES """.format(sname=schemaname, tname=tablename) + joined_values
    print(insert_statement)
    run_statement = engine.execute(insert_statement)
    for row in run_statement.fetchall():
        print(row)

INSERT INTO osc_datacommons_iceberg_dev.wri_test.gppd_new 
    VALUES ('Albas', 2.8, '62f65f0d-6dcc-4265-8be1-c181945de9c2')
(1,)
INSERT INTO osc_datacommons_iceberg_dev.wri_test.gppd_new 
    VALUES ('Albi', 6.90448, '62f65f0d-6dcc-4265-8be1-c181945de9c2')
(1,)
INSERT INTO osc_datacommons_iceberg_dev.wri_test.gppd_new 
    VALUES ('Albias', 2.2, '62f65f0d-6dcc-4265-8be1-c181945de9c2')
(1,)
INSERT INTO osc_datacommons_iceberg_dev.wri_test.gppd_new 
    VALUES ('Allaire', 7.4, '62f65f0d-6dcc-4265-8be1-c181945de9c2')
(1,)


Query data and Iceberg snapshot

In [39]:
print(dataset_query)
dataset = engine.execute(dataset_query)
for row in dataset.fetchall():
    print(row)

SELECT * FROM osc_datacommons_iceberg_dev.wri_test.gppd_new limit 10
('Albi', 6.90448, '62f65f0d-6dcc-4265-8be1-c181945de9c2')
('Allaire', 7.4, '62f65f0d-6dcc-4265-8be1-c181945de9c2')
('Albi', 6.70448, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
('Albias', 2.41, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
('Allaire', 8.2, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
('Albas', 2.8, '62f65f0d-6dcc-4265-8be1-c181945de9c2')
('Albas', 1.8, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
('Albias', 2.2, '62f65f0d-6dcc-4265-8be1-c181945de9c2')


In [40]:
print(snapshot_query)
dataset = engine.execute(snapshot_query)
for row in dataset.fetchall():
    print(row)

SELECT committed_at, snapshot_id, parent_id FROM osc_datacommons_iceberg_dev.wri_test."gppd_new$snapshots"
('2021-11-17 15:09:12.459 UTC', 9025854779685413976, None)
('2021-11-17 15:09:21.143 UTC', 4181945915156604382, 9025854779685413976)
('2021-11-17 15:09:22.030 UTC', 302793385755985542, 4181945915156604382)
('2021-11-17 15:09:22.874 UTC', 4237598940379949314, 302793385755985542)
('2021-11-17 15:09:23.879 UTC', 7161852363068757534, 4237598940379949314)
('2021-11-17 15:10:38.231 UTC', 4462538251865928736, 7161852363068757534)
('2021-11-17 15:10:39.343 UTC', 419656586561256577, 4462538251865928736)
('2021-11-17 15:10:40.250 UTC', 2845024834783547281, 419656586561256577)
('2021-11-17 15:10:41.306 UTC', 3968975231154658376, 2845024834783547281)


Query only the first data set (Iceberg time machine)

In [41]:
past_dataset_query = ('SELECT * FROM osc_datacommons_iceberg_dev.{sname}.\"{tname}@7161852363068757534\"').format(sname=schemaname,tname=tablename)
print(past_dataset_query)
dataset = engine.execute(past_dataset_query)
for row in dataset.fetchall():
    print(row)

SELECT * FROM osc_datacommons_iceberg_dev.wri_test."gppd_new@7161852363068757534"
('Albias', 2.41, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
('Albi', 6.70448, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
('Albas', 1.8, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
('Allaire', 8.2, '32a98e7e-0f8e-4c76-86bb-7e598928d859')


Rollback the second data set 

In [42]:
rollback_request = ('CALL osc_datacommons_iceberg_dev.system.rollback_to_snapshot(\'{sname}\', \'{tname}\', 7161852363068757534)').format(sname=schemaname,tname=tablename)
print(rollback_request)
dataset = engine.execute(rollback_request)
for row in dataset.fetchall():
    print(row)

CALL osc_datacommons_iceberg_dev.system.rollback_to_snapshot('wri_test', 'gppd_new', 7161852363068757534)
(True,)


Query the full table again, the second data set has been rolled back

In [43]:
print(dataset_query)
dataset = engine.execute(dataset_query)
for row in dataset.fetchall():
    print(row)

SELECT * FROM osc_datacommons_iceberg_dev.wri_test.gppd_new limit 10
('Albi', 6.70448, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
('Albias', 2.41, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
('Allaire', 8.2, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
('Albas', 1.8, '32a98e7e-0f8e-4c76-86bb-7e598928d859')
