## Loading package dependencies

The `sqlalchemy-trino` package currently requires `sqlalchemy==1.3`.
This requirement may be lifted in the future.

`%pip install` commands need only be run once per JupyterHub session.
If you restart your JupyterHub server, they should be re-installed.

Notebook dependencies may be pre-installed on custom notebook images in future iterations.

In [1]:
%pip install trino python-dotenv
%pip install --upgrade sqlalchemy==1.3 sqlalchemy-trino

You should consider upgrading via the '/opt/app-root/bin/python3.8 -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.
You should consider upgrading via the '/opt/app-root/bin/python3.8 -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
from dotenv import load_dotenv
import os
import pathlib

dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src'))
dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env'
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path,override=True)

SQL Alchemy DB Connection through Trino

In [19]:
import trino
from osc_ingest_trino import *
from sqlalchemy.engine import create_engine

sqlstring = 'trino://{user}@{host}:{port}/'.format(
    user = os.environ['TRINO_USER'],
    host = os.environ['TRINO_HOST'],
    port = os.environ['TRINO_PORT']
)
sqlargs = {
    'auth': trino.auth.JWTAuthentication(os.environ['TRINO_PASSWD']),
    'http_scheme': 'https'
}
engine = create_engine(sqlstring, connect_args = sqlargs)
print("connecting with engine " + str(engine))
connection = engine.connect()

connecting with engine Engine(trino://MichaelTiemannOSC@trino-secure-odh-trino.apps.odh-cl1.apps.os-climate.org:443/)


Show available trino catalogs

In [4]:
schema_read = engine.execute("show catalogs")
for row in schema_read.fetchall():
    print(row)

('jmx',)
('osc_datacommons_dev',)
('osc_datacommons_iceberg_dev',)
('osc_datacommons_prod',)
('system',)


In [5]:
import boto3

s3_resource = boto3.resource(
    service_name="s3",
    endpoint_url=os.environ['S3_LANDING_ENDPOINT'],
    aws_access_key_id=os.environ['S3_LANDING_ACCESS_KEY'],
    aws_secret_access_key=os.environ['S3_LANDING_SECRET_KEY'],
)

Create a simple data frame for testing

In [11]:
import io
import uuid

import pandas as pd

buffer = io.BytesIO()
sub_file = s3_resource.Object(os.environ['S3_LANDING_BUCKET'],'EPA/national_combined-20211104/NATIONAL_NAICS_FILE.CSV')
sub_file.download_fileobj(buffer)
buffer.seek(0)
df = pd.read_csv(buffer, header=0, sep=',', engine='c')

# Add a unique identifier to the data set
uid = str(uuid.uuid4())
df['uuid'] = uid
# Print the output
df = df.convert_dtypes()
df

Unnamed: 0,REGISTRY_ID,PGM_SYS_ACRNM,PGM_SYS_ID,INTEREST_TYPE,NAICS_CODE,PRIMARY_INDICATOR,CODE_DESCRIPTION,uuid
0,110000491735,EIS,12663611,AIR EMISSIONS CLASSIFICATION UNKNOWN,424710,PRIMARY,PETROLEUM BULK STATIONS AND TERMINALS.,bcc42ceb-5c03-4632-a21e-70ba5b4aff75
1,110000491735,TRIS,99501TSRLS1522P,TRI REPORTER,424710,PRIMARY,PETROLEUM BULK STATIONS AND TERMINALS.,bcc42ceb-5c03-4632-a21e-70ba5b4aff75
2,110000491735,AIRS/AFS,0202000032,AIR MAJOR,424710,PRIMARY,PETROLEUM BULK STATIONS AND TERMINALS.,bcc42ceb-5c03-4632-a21e-70ba5b4aff75
3,110000491735,AIR,AK0000000202000032,AIR MAJOR,424710,PRIMARY,PETROLEUM BULK STATIONS AND TERMINALS.,bcc42ceb-5c03-4632-a21e-70ba5b4aff75
4,110000491735,ICIS,2600029861,ENFORCEMENT/COMPLIANCE ACTIVITY,424720,PRIMARY,PETROLEUM AND PETROLEUM PRODUCTS MERCHANT WHOL...,bcc42ceb-5c03-4632-a21e-70ba5b4aff75
...,...,...,...,...,...,...,...,...
2025044,110071133944,RCRAINFO,WYR000220772,UNSPECIFIED UNIVERSE,713910,PRIMARY,GOLF COURSES AND COUNTRY CLUBS.,bcc42ceb-5c03-4632-a21e-70ba5b4aff75
2025045,110071133945,RCRAINFO,WYR000220780,UNSPECIFIED UNIVERSE,811111,PRIMARY,GENERAL AUTOMOTIVE REPAIR.,bcc42ceb-5c03-4632-a21e-70ba5b4aff75
2025046,110071133946,RCRAINFO,WYR000220798,SQG,811111,PRIMARY,GENERAL AUTOMOTIVE REPAIR.,bcc42ceb-5c03-4632-a21e-70ba5b4aff75
2025047,110071133948,RCRAINFO,WYR000220814,LQG,486210,PRIMARY,PIPELINE TRANSPORTATION OF NATURAL GAS.,bcc42ceb-5c03-4632-a21e-70ba5b4aff75


In [13]:
catalogname = "osc_datacommons_dev"
schemaname = 'epa_frs'
tablename = 'naics'

Create ingestion schema based on source data name and remove old tables if necessary

In [14]:
schema_check = engine.execute(f'create schema if not exists {catalogname}.{schemaname}')
for row in schema_check.fetchall():
    print(row)

(True,)


In [16]:
engine.execute(f'show tables in {catalogname}.{schemaname}').fetchall()

[]

In [17]:
table_check = engine.execute(f'drop table if exists {catalogname}.{schemaname}.{tablename}')
for row in table_check.fetchall():
    print(row)

(True,)


In [22]:
schema = create_table_schema_pairs(df)
tabledef = """
create table if not exists {cname}.{sname}.{tname} (
{schema}
) with (
    format = 'parquet'
)
""".format(cname=catalogname, schema=schema, sname=schemaname, tname=tablename)
print(tabledef)


create table if not exists osc_datacommons_dev.epa_frs.naics (
    REGISTRY_ID bigint,
    PGM_SYS_ACRNM varchar,
    PGM_SYS_ID varchar,
    INTEREST_TYPE varchar,
    NAICS_CODE bigint,
    PRIMARY_INDICATOR varchar,
    CODE_DESCRIPTION varchar,
    uuid varchar
) with (
    format = 'parquet'
)



In [23]:
table_create = engine.execute(tabledef)
for row in table_create.fetchall():
    print(row)

(True,)


In [24]:
df.to_sql('naics', con=engine, schema='epa_frs', if_exists='replace')

TrinoUserError: TrinoUserError(type=USER_ERROR, name=NOT_SUPPORTED, message="This connector does not support creating tables", query_id=20211122_211341_00422_7fwea)