In [1]:
# pip install boto3
# pip install python-dotenv
# pip install trino
# pip install pandas
# pip install pyarrow

In [2]:
import boto3
import pandas as pd
import io

Load Environment Variables

In [3]:
from dotenv import dotenv_values, load_dotenv
import os
import pathlib

dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src'))
dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env'
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path,override=True)

Create an S3 resource through AWS client access, retrieve the relevant bucket and list objects

In [4]:
s3_resource = boto3.resource(
    service_name="s3",
    aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
    aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
    aws_session_token=os.environ['AWS_SESSION_TOKEN'],
)

In [5]:
bucket = s3_resource.Bucket(os.environ['AWS_S3_BUCKET'])

# list all objects
#for obj in bucket.objects.all():
#    print(obj.key, obj.last_modified)
    
files = []
for file in bucket.objects.filter(Prefix='pudl-0.4.0/pudl_data/parquet/'):
    if file.key.endswith('parquet'):
        files.append(file.key)
files[0:10]

['pudl-0.4.0/pudl_data/parquet/epacems/year=1995/state=AL/0bd3b8d4be104c6c9b4531fff26f0671.parquet',
 'pudl-0.4.0/pudl_data/parquet/epacems/year=1995/state=FL/a3c8fed0646345c5a959862f6307112e.parquet',
 'pudl-0.4.0/pudl_data/parquet/epacems/year=1995/state=GA/7f6fa554bac640bc8ac6b4f7698f1e22.parquet',
 'pudl-0.4.0/pudl_data/parquet/epacems/year=1995/state=IA/8cae277530824844ad4f54b7c88e8813.parquet',
 'pudl-0.4.0/pudl_data/parquet/epacems/year=1995/state=IL/4854d33914df43cf9b45527d1c56879c.parquet',
 'pudl-0.4.0/pudl_data/parquet/epacems/year=1995/state=IN/2f40504880c74aea9884d2e7576ef89e.parquet',
 'pudl-0.4.0/pudl_data/parquet/epacems/year=1995/state=KS/9aed897519164b20b8346c15353d892d.parquet',
 'pudl-0.4.0/pudl_data/parquet/epacems/year=1995/state=KY/793ec765eb474844a92bf2821b4f9d78.parquet',
 'pudl-0.4.0/pudl_data/parquet/epacems/year=1995/state=MA/e8044aec4c784e3e84031b233a32e037.parquet',
 'pudl-0.4.0/pudl_data/parquet/epacems/year=1995/state=MD/58b58c25c07b42e3bec1313f7d15cf88.

Read the first parquet file into a Pandas data frame and view the results

In [6]:
buffer = io.BytesIO()
parquet_file = s3_resource.Object(os.environ['AWS_S3_BUCKET'],files[0])
parquet_file.download_fileobj(buffer)
df = pd.read_parquet(buffer)
df

Unnamed: 0,plant_id_eia,unitid,operating_datetime_utc,operating_time_hours,gross_load_mw,steam_load_1000_lbs,so2_mass_lbs,so2_mass_measurement_code,nox_rate_lbs_mmbtu,nox_rate_measurement_code,nox_mass_lbs,nox_mass_measurement_code,co2_mass_tons,co2_mass_measurement_code,heat_content_mmbtu,facility_id,unit_id_epa
0,7,1,1995-01-01 06:00:00+00:00,0.0,0.0,,,,,,,,,,0.000000,,
1,7,1,1995-01-01 07:00:00+00:00,0.0,0.0,,,,,,,,,,0.000000,,
2,7,1,1995-01-01 08:00:00+00:00,0.0,0.0,,,,,,,,,,0.000000,,
3,7,1,1995-01-01 09:00:00+00:00,0.0,0.0,,,,,,,,,,0.000000,,
4,7,1,1995-01-01 10:00:00+00:00,0.0,0.0,,,,,,,,,,0.000000,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
122635,56,3,1996-01-01 01:00:00+00:00,1.0,198.0,,2030.500000,Measured,0.514,Measured,985.749023,Calculated,196.800003,Measured,1917.800049,11,55
122636,56,3,1996-01-01 02:00:00+00:00,1.0,176.0,,1629.599976,Measured,0.534,Measured,906.731995,Calculated,174.199997,Measured,1698.000000,11,55
122637,56,3,1996-01-01 03:00:00+00:00,1.0,167.0,,1495.400024,Measured,0.539,Measured,869.083984,Calculated,165.399994,Measured,1612.400024,11,55
122638,56,3,1996-01-01 04:00:00+00:00,1.0,174.0,,1630.800049,Measured,0.536,Measured,892.921997,Calculated,170.899994,Measured,1665.900024,11,55


In [None]:
import trino
conn = trino.dbapi.connect(
    auth=trino.auth.BasicAuthentication(os.environ['TRINO_USER'], os.environ['TRINO_PASSWD']),
    host=os.environ['TRINO_HOST'],
    port=int(os.environ['TRINO_PORT']),
    http_scheme='https',
    verify=True,
)
cur = conn.cursor()

In [None]:
cur.execute('create schema if not exists demo1.epacems')
cur.fetchall()

In [None]:
tablename = 'Y95_AL'
cur.execute('drop table if exists demo1.epacems.{tname}'.format(tname=tablename))
cur.fetchall()

In [None]:
tabledef = """create table if not exists demo1.epacems_Y95_AL.{tname}(
{schema}
) with (
    format = 'parquet',
    external_location = 's3a://{bucket}/pudl-0.4.0/pudl_data/parquet/epacems/year=1995/state=AL/0bd3b8d4be104c6c9b4531fff26f0671.parquet'
)""".format(bucket=AWS_S3_BUCKET,tname=tablename)
print(tabledef)

# tables created externally may not show up immediately in cloud-beaver
cur.execute(tabledef)
cur.fetchall()