# Setup

In [1]:
# two possible apis to generate a trino connection:
import trino
# from pyhive import presto

# pandas dfs
import pandas as pd

import urllib3
urllib3.disable_warnings()

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [4]:
from dotenv import dotenv_values, load_dotenv
import os
import pathlib

dotenv_path = '../credentials.env'
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path,override=True)

In [8]:
JWT_TOKEN = os.environ['TRINO_PASSWD']
conn = trino.dbapi.connect(
    host=os.environ['TRINO_HOST'],
    port=int(os.environ['TRINO_PORT']),
    user=os.environ['TRINO_USER'],
    http_scheme='https',
    auth=trino.auth.JWTAuthentication(JWT_TOKEN),
    verify=True,
)
cur = conn.cursor()

# Get Catalogs

In [14]:
cur.execute('show catalogs')
all_catalogs = cur.fetchall()
all_catalogs

[['jmx'],
 ['kafka_fx'],
 ['kafka_osclimate'],
 ['oef_openclimate'],
 ['osc_datacommons_dev'],
 ['osc_datacommons_hive_ingest'],
 ['system'],
 ['uwm_prometheus']]

# Get Schemas for each Catalog

In [25]:
import traceback

# Assuming all_catalogs is a list of tuples, with each tuple containing at least one element: the catalog name.
all_data = []

for catalog in all_catalogs:
    catalog_name = catalog[0]
    print(catalog_name)
    try:
        cur.execute(f'SHOW SCHEMAS IN {catalog_name}')
        cur_schemas = cur.fetchall()
        catalog_data = {'catalog': catalog_name, 'schemas': []}
        
        for schema in cur_schemas:
            schema_name = schema[0]
            try:
                cur.execute(f'SHOW TABLES IN {catalog_name}.{schema_name}')
                cur_tables = cur.fetchall()
                # Flatten the list of tuples to a list of table names
                tables_list = [table[0] for table in cur_tables]
                schema_data = {'schema': schema_name, 'tables': tables_list}
                catalog_data['schemas'].append(schema_data)
            except Exception as e:
                print(f'Error fetching tables for schema {schema_name} in catalog {catalog_name}: {e}')
                traceback.print_exc()
                # Optionally append a schema with an error note, or pass to ignore
                # catalog_data['schemas'].append({'schema': schema_name, 'error': 'Failed to fetch tables'})

        all_data.append(catalog_data)
    except Exception as e:
        print(f'Error with catalog {catalog_name}: {e}')
        traceback.print_exc()
        # Optionally keep the catalog with an error note, or pass to ignore
        # all_data.append({'catalog': catalog_name, 'error': 'Failed to fetch schemas'})

# Now all_data contains all the information
all_data

jmx
kafka_fx
Error fetching tables for schema ecb-fx in catalog kafka_fx: TrinoUserError(type=USER_ERROR, name=SYNTAX_ERROR, message="line 1:28: mismatched input '-'. Expecting: '.', 'LIKE', <EOF>", query_id=20240404_185929_00389_xvsm6)


Traceback (most recent call last):
  File "/var/folders/gp/7f5d9bz54rgg8x8hbqxjrr2h0000gn/T/ipykernel_44574/24418238.py", line 17, in <module>
    cur.execute(f'SHOW TABLES IN {catalog_name}.{schema_name}')
  File "/Users/lucasgoh/.julia/conda/3/aarch64/lib/python3.10/site-packages/trino/dbapi.py", line 589, in execute
    self._iterator = iter(self._query.execute())
  File "/Users/lucasgoh/.julia/conda/3/aarch64/lib/python3.10/site-packages/trino/client.py", line 797, in execute
    self._result.rows += self.fetch()
  File "/Users/lucasgoh/.julia/conda/3/aarch64/lib/python3.10/site-packages/trino/client.py", line 817, in fetch
    status = self._request.process(response)
  File "/Users/lucasgoh/.julia/conda/3/aarch64/lib/python3.10/site-packages/trino/client.py", line 595, in process
    raise self._process_error(response["error"], response.get("id"))
trino.exceptions.TrinoUserError: TrinoUserError(type=USER_ERROR, name=SYNTAX_ERROR, message="line 1:28: mismatched input '-'. Expecting

kafka_osclimate
oef_openclimate
Error with catalog oef_openclimate: TrinoExternalError(type=EXTERNAL, name=JDBC_ERROR, message="Error listing schemas for catalog oef_openclimate: Could not open SSL root certificate file //.postgresql/root.crt.", query_id=20240404_185930_00396_xvsm6)
osc_datacommons_dev


Traceback (most recent call last):
  File "/var/folders/gp/7f5d9bz54rgg8x8hbqxjrr2h0000gn/T/ipykernel_44574/24418238.py", line 10, in <module>
    cur.execute(f'SHOW SCHEMAS IN {catalog_name}')
  File "/Users/lucasgoh/.julia/conda/3/aarch64/lib/python3.10/site-packages/trino/dbapi.py", line 589, in execute
    self._iterator = iter(self._query.execute())
  File "/Users/lucasgoh/.julia/conda/3/aarch64/lib/python3.10/site-packages/trino/client.py", line 797, in execute
    self._result.rows += self.fetch()
  File "/Users/lucasgoh/.julia/conda/3/aarch64/lib/python3.10/site-packages/trino/client.py", line 817, in fetch
    status = self._request.process(response)
  File "/Users/lucasgoh/.julia/conda/3/aarch64/lib/python3.10/site-packages/trino/client.py", line 595, in process
    raise self._process_error(response["error"], response.get("id"))
  File "/Users/lucasgoh/.julia/conda/3/aarch64/lib/python3.10/site-packages/trino/client.py", line 565, in _process_error
    raise exceptions.Trino

osc_datacommons_hive_ingest
system
uwm_prometheus


[{'catalog': 'jmx',
  'schemas': [{'schema': 'current', 'tables': []},
   {'schema': 'history', 'tables': []},
   {'schema': 'information_schema',
    'tables': ['applicable_roles',
     'columns',
     'enabled_roles',
     'roles',
     'schemata',
     'table_privileges',
     'tables',
     'views']}]},
 {'catalog': 'kafka_fx',
  'schemas': [{'schema': 'information_schema',
    'tables': ['applicable_roles',
     'columns',
     'enabled_roles',
     'roles',
     'schemata',
     'table_privileges',
     'tables',
     'views']},
   {'schema': 'kepler', 'tables': []},
   {'schema': 'tpch', 'tables': []}]},
 {'catalog': 'kafka_osclimate',
  'schemas': [{'schema': 'electricitymap', 'tables': []},
   {'schema': 'information_schema',
    'tables': ['applicable_roles',
     'columns',
     'enabled_roles',
     'roles',
     'schemata',
     'table_privileges',
     'tables',
     'views']}]},
 {'catalog': 'osc_datacommons_dev',
  'schemas': [{'schema': 'aicoe_osc_demo_results', 'table

In [26]:
import json

# Specify the file path
file_path = '../data/data_commons/trino_catalog_schema_table.json'

# Write the JSON data to a file
with open(file_path, 'w') as file:
    json.dump(all_data, file, indent=4)

In [27]:
# CNAME is Catalog Name (osc_datacommons_dev)
# SNAME is Schema Name (wri_gppd)
# TNAME is Table Name (plants)

import os
import pandas as pd
from pathlib import Path

def trino_to_df(cname, sname, tname):
    trino_table = '.'.join([cname, sname, tname])
    cur.execute('show columns from ' + trino_table)
    columns_df = pd.DataFrame(cur.fetchall()).dropna(axis=1,how='all')
    
    cur.execute('select * from ' + trino_table)
    df = pd.DataFrame(cur.fetchall())
    df.columns = columns_df.iloc[:, 0]
    return df

# Function to save DataFrame to CSV
def save_df_to_csv(df, cname, sname, tname):
    # Adjust the base directory to a writable path in this environment
    base_dir = '../data/data_commons'
    dir_path = os.path.join(base_dir, cname, sname)
    
    # Ensure the directory exists
    Path(dir_path).mkdir(parents=True, exist_ok=True)
    
    # Define the full file path
    file_path = os.path.join(dir_path, f'{tname}.csv')
    
    # Save the DataFrame to CSV
    df.to_csv(file_path, index=False)
    print(f'Saved to {file_path}')

# Saving all tables with 'emissions' in it

In [28]:
# Iterate over all_data
for catalog_data in all_data:
    cname = catalog_data['catalog']
    for schema_data in catalog_data['schemas']:
        sname = schema_data['schema']
        for tname in schema_data['tables']:
            if 'emission' in tname:
                try:
                    df = trino_to_df(cname, sname, tname)
                    # Adjust the path as necessary for your environment
                    save_df_to_csv(df, cname, sname, tname)
                except Exception as e:
                    print(f'Error processing {cname}.{sname}.{tname}: {e}')

Saved to ../data/data_commons/osc_datacommons_dev/demo_dv/itr_cumulative_emissions.csv
Saved to ../data/data_commons/osc_datacommons_dev/mdt_sandbox/sf_primap_hist_emissions.csv
Saved to ../data/data_commons/osc_datacommons_dev/mdt_sandbox/sf_primap_hist_emissions_source.csv
Saved to ../data/data_commons/osc_datacommons_dev/mdt_sandbox/sf_total_sovereign_emissions.csv
Saved to ../data/data_commons/osc_datacommons_dev/mdt_sandbox/sf_total_sovereign_emissions_source.csv
Saved to ../data/data_commons/osc_datacommons_dev/pcaf_sovereign_footprint/oecd_co2_emissions.csv
Saved to ../data/data_commons/osc_datacommons_dev/pcaf_sovereign_footprint/sf_primap_emissions_with_lulucf.csv
Saved to ../data/data_commons/osc_datacommons_dev/pcaf_sovereign_footprint/sf_primap_emissions_without_lulucf.csv
Saved to ../data/data_commons/osc_datacommons_dev/pcaf_sovereign_footprint/sf_total_sovereign_emissions.csv
Saved to ../data/data_commons/osc_datacommons_dev/rmi/emissions_targets.csv
Saved to ../data/dat