# Ingest SEC DERA data into Trino pipeline

Copyright (C) 2021 OS-Climate

This sample shows:
* How to create schemas and tables via the Trino / SQLAlchemy on an underlying Iceberg data volume
* Apache Iceberg ACID transaction and time travel capabilities used for data set versioning


Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Contributed by Michael Tiemann (Github: MichaelTiemannOSC)

%%capture pipoutput
%pip install boto3 python-dotenv
%pip install --upgrade sqlalchemy==1.3 sqlalchemy-trino
%pip install pandas pyarrow fastparquet
%pip install anytree
%pip install osc-ingest-tools

In [None]:
from dotenv import dotenv_values, load_dotenv
from osc_ingest_trino import *
import os
import pathlib
import pandas as pd
import uuid
import trino
from sqlalchemy.engine import create_engine

Load Environment Variables

In [None]:
dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src'))
dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env'
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path,override=True)

In [None]:
import boto3

s3_resource = boto3.resource(
    service_name="s3",
    endpoint_url=os.environ['S3_LANDING_ENDPOINT'],
    aws_access_key_id=os.environ['S3_LANDING_ACCESS_KEY'],
    aws_secret_access_key=os.environ['S3_LANDING_SECRET_KEY'],
)

Create a simple data frame for testing

In [None]:
import io

buffer = io.BytesIO()
sub_file = s3_resource.Object(os.environ['S3_LANDING_BUCKET'],'SEC-DERA/2020q4/sub.txt')
sub_file.download_fileobj(buffer)
buffer.seek(0)
df = pd.read_csv(buffer, header=0, sep='\t', nrows=20, engine='c')
new_df = df.iloc[10:20]
df = df.iloc[0:10]

# Add a unique identifier to the data set
uid = str(uuid.uuid4())
df['uuid'] = uid
# Print the output
df = df.convert_dtypes()
df


Create custom meta data and declare variable for schema and table for the data set

In [None]:
df.columns

In [None]:
custom_meta_content = {
    'dataset_key': 'SEC-DERA',
    'title': 'SEC DERA Disclosures',
    'description': 
    '''The DERA Financial Statement Data Sets provide numeric information from the face financials of all financial statements.
    
    This data is extracted from exhibits to corporate financial reports filed with the Commission using eXtensible Business Reporting Language (XBRL).  As compared to the more extensive Financial Statement and Notes Data Sets, which provide the numeric and narrative disclosures from all financial statements and their notes, the Financial Statement Data Sets are more compact.''',
    'version': '2020q4',
    'release_date': '20201231',
    'fields': [
    {
        'adsh':'Accession Number. The 20-character string formed from the 18-digit number assigned by the SEC to each EDGAR submission.',
        'cik':'Central Index Key (CIK). Ten digit number assigned by the SEC to each registrant that submits filings.',
        'name':'Name of registrant. This corresponds to the name of the legal entity as recorded in EDGAR as of the filing date.',
        'sic':'Standard Industrial Classification (SIC). Four digit code assigned by the SEC as of the filing date, indicating the registrant’s type of business.',
        'countryba':'The ISO 3166-1 country of the registrant’s business address.',
        'stprba':'The state or province of the registrant’s business address, if field countryba is US or CA.',
        'cityba':'The city of the registrant’s business address.',
        'zipba':'The zip code of the registrant’s business address.',
        'bas1':'The first line of the street of the registrant’s business address.',
        'bas2':'The second line of the street of the registrant’s business address.',
        'baph':'The phone number of the registrant’s business address.',
        'countryma':'The ISO 3166-1 country of the registrant’s mailing address.',
        'stprma':'The state or province of the registrant’s mailing address, if field countryma is US or CA.',
        'cityma':'The city of the registrant’s mailing address.',
        'zipma':'The zip code of the registrant’s mailing address.',
        'mas1':'The first line of the street of the registrant’s mailing address.',
        'mas2':'The second line of the street of the registrant’s mailing address.',
        'countryinc':'The country of incorporation for the registrant.',
        'stprinc':'The state or province of incorporation for the registrant, if countryinc is US or CA.',
        'ein':'Employee Identification Number, 9 digit identification number assigned by the Internal Revenue Service to business entities operating in the United States.',
        'former':'Most recent former name of the registrant, if any.',
        'changed':'Date of change from the former name, if any.',
        'afs':'Filer status with the SEC at the time of submission:\n\
1-LAF=Large Accelerated,\n\
2-ACC=Accelerated,\n\
3-SRA=Smaller Reporting Accelerated,\n\
4-NON=Non-Accelerated,\n\
5-SML=Smaller Reporting Filer,\n\
NULL=not assigned.',
        'wksi':'Well Known Seasoned Issuer (WKSI). An issuer that meets specific SEC requirements at some point during a 60-day period preceding the date the issuer satisfies its obligation to update its shelf registration statement.',
        'fye':'Fiscal Year End Date, rounded to nearest month-end.',
        'form':'The submission type of the registrant’s filing.',
        'period':'Balance Sheet Date, rounded to nearest month-end.',
        'fy':'Fiscal Year Focus (as defined in EFM Ch. 6).',
        'fp':'Fiscal Period Focus (as defined in EFM Ch. 6) within Fiscal Year. The 10-Q for the 1st, 2nd and 3rd quarters would have a fiscal period focus of Q1, Q2 (or H1), and Q3 (or M9) respectively, and a 10-K would have a fiscal period focus of FY.',
        'filed':'The date of the registrant’s filing with the Commission.',
        'accepted':'The acceptance date and time of the registrant’s filing with the Commission. Filings accepted after 5:30pm EST are considered filed on the following business day.',
        'prevrpt':'Previous Report –TRUE indicates that the submission information was subsequently amended.',
        'detail':'TRUE indicates that the XBRL submission contains quantitative disclosures within the footnotes and schedules at the required detail level (e.g., each amount).',
        'instance':'The name of the submitted XBRL Instance Document (EX-101.INS) type data file. The name often begins with the company ticker symbol.',
        'nciks':'Number of Central Index Keys (CIK) of registrants (i.e., business units) included in the consolidating entity’s submitted filing.',
        'aciks':'Additional CIKs of co-registrants included in  a consolidating entity’s EDGAR submission, separated by spaces. If there are no other co-registrants (i.e., nciks=1), the value of aciks is NULL.  For a very small number of filers, the list of co-registrants is too long to fit in the field.  Where this is the case, PARTIAL will appear at the end of the list indicating that not all co-registrants’ CIKs are included in the field; users should refer to the complete submission file for all CIK information.'
    }]
}
schemaname = 'sec_dera'
tablename = 'sub'

Convert custom metadata content in json format into Pandas DataFrame

In [None]:
df_meta_fields = pd.json_normalize(custom_meta_content, record_path =['fields'], meta=['dataset_key']).convert_dtypes()
df_meta_fields

In [None]:
df_meta_fields.info(verbose=True)

In [None]:
df_meta_table = pd.json_normalize(custom_meta_content, max_level=0)
df_meta_table.drop('fields', inplace=True, axis=1)
df_meta_table['schema'] = schemaname
df_meta_table = df_meta_table.convert_dtypes()
df_meta_table

In [None]:
df_meta_table.info(verbose=True)

Open a Trino connection using JWT for authentication

In [None]:
sqlstring = 'trino://{user}@{host}:{port}/'.format(
    user = os.environ['TRINO_USER'],
    host = os.environ['TRINO_HOST'],
    port = os.environ['TRINO_PORT']
)
sqlargs = {
    'auth': trino.auth.JWTAuthentication(os.environ['TRINO_PASSWD']),
    'http_scheme': 'https'
}
engine = create_engine(sqlstring, connect_args = sqlargs)
print("connecting with engine " + str(engine))
connection = engine.connect()

In [None]:
engine.execute('show schemas in osc_datacommons_iceberg_dev').fetchall()


In [None]:
catalogname = 'osc_datacommons_iceberg_dev'

# Show available schemas to ensure trino connection is set correctly
schema_read = engine.execute(f'show schemas in {catalogname}')
for row in schema_read.fetchall():
    print(row)

Create ingestion schema based on source data name and remove old tables if necessary

In [None]:
schema_check = engine.execute(f'create schema if not exists {catalogname}.{schemaname}')
for row in schema_check.fetchall():
    print(row)

In [None]:
engine.execute('show tables in osc_datacommons_iceberg_dev.sec_dera').fetchall()

In [None]:
table_check = engine.execute(f'drop table if exists {catalogname}.{schemaname}.{tablename}')
for row in table_check.fetchall():
    print(row)

In [None]:
schema = create_table_schema_pairs(df)
tabledef = """
create table if not exists {cname}.{sname}.{tname} (
{schema}
) with (
    format = 'parquet',
    partitioning = ARRAY['uuid']
)
""".format(cname=catalogname, schema=schema, sname=schemaname, tname=tablename)
print(tabledef)

In [None]:
table_create = engine.execute(tabledef)
for row in table_create.fetchall():
    print(row)

In [None]:
# Append data frame to new Trino table 
# this statement should work but receives a TrinoUserError: "This connector does not support creating tables"
# df.to_sql('gppd', con=engine, schema='wri_test', if_exists='append')

In [None]:
list_values = df.values.tolist()
list_values[0]

In [None]:
def quote_appropriately(x):
    if isinstance(x, str):
        x = x.replace("'", "′")
        return f"'{x}'"
    if isinstance(x, (int,float)):
        return str(x)
    return 'NULL'

list_length = len(list_values)
for i in range(list_length):
    joined_values = '(' + ','.join([quote_appropriately(x) for x in list_values[i]]) + ')'
    insert_statement = """INSERT INTO {cname}.{sname}.{tname} 
    VALUES """.format(cname=catalogname, sname=schemaname, tname=tablename) + joined_values
    print(insert_statement)
    run_statement = engine.execute(insert_statement)
    for row in run_statement.fetchall():
        print(row)

In [None]:
dataset_query = ('SELECT * FROM {cname}.{sname}.{tname} limit 10').format(cname=catalogname,sname=schemaname,tname=tablename)
print(dataset_query)
dataset = engine.execute(dataset_query)
for row in dataset.fetchall():
    print(row)

Query Iceberg snapshots for WRI GPPD data set. Snapshots allow having an immutable set of the data at a given time. They are automatically created on every append or removal of data.

In [None]:
snapshot_query = ('SELECT committed_at, snapshot_id, parent_id FROM {cname}.{sname}.\"{tname}$snapshots\"').format(cname=catalogname,sname=schemaname,tname=tablename)
print(snapshot_query)
dataset = engine.execute(snapshot_query)
for row in dataset.fetchall():
    print(row)

Create metadata table for schema / dataset level information

In [None]:
# declare variable names for metadata structure in Trino
meta_schema_name = 'metastore_iceberg'
meta_table_name_dataset = 'meta_tables_iceberg'
meta_table_name_fields = 'meta_fields_iceberg'

In [None]:
schema_check = engine.execute(f'create schema if not exists {catalogname}.' + meta_schema_name)
for row in schema_check.fetchall():
    print(row)

In [None]:
table_check = engine.execute(f'drop table if exists {catalogname}.' + meta_schema_name + '.' + meta_table_name_dataset)
for row in table_check.fetchall():
    print(row)

In [None]:
schema_meta_table = create_table_schema_pairs(df_meta_table)
tabledef = """
create table if not exists {cname}.{sname}.{tname} (
{schema}
) with (
    format = 'parquet',
    partitioning = ARRAY['dataset_key']
)
""".format(cname=catalogname, schema=schema_meta_table, sname=meta_schema_name, tname=meta_table_name_dataset)
print(tabledef)

In [None]:
meta_table_create = engine.execute(tabledef)
for row in meta_table_create.fetchall():
    print(row)

In [None]:
list_values_meta_table = df_meta_table.values.tolist()
list_values_meta_table[0]

In [None]:
joined_values = '(\'' + list_values_meta_table[0][0] + '\', \'' + list_values_meta_table[0][1] + '\', \'' + list_values_meta_table[0][2] + '\', \'' + list_values_meta_table[0][3] + '\', \'' + list_values_meta_table[0][4] + '\', \'' + list_values_meta_table[0][5] + '\')'
insert_statement = """INSERT INTO osc_datacommons_iceberg_dev.{sname}.{tname} 
VALUES """.format(sname=meta_schema_name,tname=meta_table_name_dataset) + joined_values
print(insert_statement)
run_statement = engine.execute(insert_statement)
for row in run_statement.fetchall():
    print(row)

In [None]:
meta_query_table = ('SELECT * FROM osc_datacommons_iceberg_dev.{sname}.{tname} limit 10').format(sname=meta_schema_name,tname=meta_table_name_dataset)
print(meta_query_table)
meta_table_query = engine.execute(meta_query_table)
for row in meta_table_query.fetchall():
    print(row)

Create metadata table for fields information

In [None]:
table_check = engine.execute('drop table if exists osc_datacommons_iceberg_dev.' + meta_schema_name + '.' + meta_table_name_fields)
for row in table_check.fetchall():
    print(row)

In [None]:
schema_meta_fields = create_table_schema_pairs(df_meta_fields)
tabledef = """
create table if not exists osc_datacommons_iceberg_dev.{sname}.{tname} (
{schema}
) with (
    format = 'parquet',
    partitioning = ARRAY['dataset_key']
)
""".format(schema=schema_meta_fields, sname=meta_schema_name, tname=meta_table_name_fields)
print(tabledef)

In [None]:
meta_fields_create = engine.execute(tabledef)
for row in meta_fields_create.fetchall():
    print(row)

In [None]:
list_values_meta_fields = df_meta_fields.values.tolist()
list_values_meta_fields[0]

In [None]:
list_fields_length = len(list_values_meta_fields)
for i in range(list_fields_length):
    joined_values = '(' + ','.join([quote_appropriately(x) for x in list_values_meta_fields[i]]) + ')'
    insert_statement = """INSERT INTO osc_datacommons_iceberg_dev.{sname}.{tname} 
    VALUES """.format(sname=meta_schema_name, tname=meta_table_name_fields) + joined_values
    print(insert_statement)
    run_statement = engine.execute(insert_statement)
    for row in run_statement.fetchall():
        print(row)

In [None]:
meta_query_fields = ('SELECT * FROM osc_datacommons_iceberg_dev.{sname}.{tname} limit 10').format(sname=meta_schema_name, tname=meta_table_name_fields)
print(meta_query_fields)
meta_fields_query = engine.execute(meta_query_fields)
for row in meta_fields_query.fetchall():
    print(row)

Update the source data to create a new data set for ingestion

In [None]:
# initialise new data for powerplants and capacity

# Set above, because it's a pain to wait to do it here
# new_df = pd.read_csv(buffer, header=None, sep='\t', nrows=10, engine='c')
# new_df.columns = df.columns
# Add a unique identifier to the data set
uid = str(uuid.uuid4())
new_df['uuid'] = uid
# Print the output
new_df = new_df.convert_dtypes()
new_df

In [None]:
new_list_values = new_df.values.tolist()
new_list_values[0]

Ingest new data set

In [None]:
list_length = len(new_list_values)
for i in range(list_length):
    joined_values = '(' + ','.join([quote_appropriately(x) for x in new_list_values[i]]) + ')'
    insert_statement = """INSERT INTO osc_datacommons_iceberg_dev.{sname}.{tname} 
    VALUES """.format(sname=schemaname, tname=tablename) + joined_values
    print(insert_statement)
    run_statement = engine.execute(insert_statement)
    for row in run_statement.fetchall():
        print(row)

Query data and Iceberg snapshot

In [None]:
print(dataset_query)
dataset = engine.execute(dataset_query)
for row in dataset.fetchall():
    print(row)

In [None]:
print(snapshot_query)
dataset = engine.execute(snapshot_query)
for row in dataset.fetchall():
    print(row)

Query only the first data set (Iceberg time machine)

In [None]:
past_dataset_query = ('SELECT * FROM osc_datacommons_iceberg_dev.{sname}.\"{tname}@6968266386201395358\"').format(sname=schemaname,tname=tablename)
print(past_dataset_query)
dataset = engine.execute(past_dataset_query)
for row in dataset.fetchall():
    print(row)

Rollback the second data set 

In [None]:
rollback_request = ('CALL osc_datacommons_iceberg_dev.system.rollback_to_snapshot(\'{sname}\', \'{tname}\', 6968266386201395358)').format(sname=schemaname,tname=tablename)
print(rollback_request)
dataset = engine.execute(rollback_request)
for row in dataset.fetchall():
    print(row)

Query the full table again, the second data set has been rolled back

In [None]:
print(dataset_query)
dataset = engine.execute(dataset_query)
for row in dataset.fetchall():
    print(row)