## Load RMI Utilities Transition Hub Data (from https://utilitytransitionhub.rmi.org/data-download/ for Data Vault Prototype)

Copyright (C) 2021 OS-Climate

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

### We have local copies of the released datasets rooted in the S3_BUCKET : RMI/RMI-202*

### The next step is to enrich with OS-C Factor metadata

Contributed by Michael Tiemann (Github: MichaelTiemannOSC)

Load Credentials

In [1]:
# From the AWS Account page, copy the export scripts from the appropriate role using the "Command Line or Programmatic Access" link
# Paste the copied text into ~/credentials.env

from dotenv import dotenv_values, load_dotenv
import os
import pathlib

dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src'))
dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env'
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path,override=True)

import osc_ingest_trino as osc
import pyarrow as pa
import pyarrow.parquet as pq
import json
import io

Create an S3 resource for the bucket holding source data

In [2]:
import boto3
s3_source = boto3.resource(
    service_name="s3",
    endpoint_url=os.environ['S3_LANDING_ENDPOINT'],
    aws_access_key_id=os.environ['S3_LANDING_ACCESS_KEY'],
    aws_secret_access_key=os.environ['S3_LANDING_SECRET_KEY'],
)
source_bucket = s3_source.Bucket(os.environ['S3_LANDING_BUCKET'])

In [3]:
# Create an S3 client.  We will user later when we write out data and metadata

trino_bucket = osc.attach_s3_bucket("S3_DEV")

In [4]:
import trino
from sqlalchemy.engine import create_engine

ingest_catalog = 'osc_datacommons_dev'
ingest_schema = 'sandbox'

sqlstring = 'trino://{user}@{host}:{port}/'.format(
    user = os.environ['TRINO_USER'],
    host = os.environ['TRINO_HOST'],
    port = os.environ['TRINO_PORT']
)
sqlargs = {
    'auth': trino.auth.JWTAuthentication(os.environ['TRINO_PASSWD']),
    'http_scheme': 'https',
    'catalog': ingest_catalog,
    'schema': ingest_schema,
}
engine = create_engine(sqlstring, connect_args = sqlargs)
connection = engine.connect()

# Show available schemas to ensure trino connection is set correctly
qres = engine.execute('show schemas')
display(qres.fetchall())

[('default',), ('information_schema',), ('sandbox',)]

Load RMI data file using pandas *read_csv* and appropriate dtype dictionaries

In [5]:
import numpy as np
import pandas as pd

# Current (November 20, 2021) dataset sometimes represents YEAR as floating point number.  We will read as string and fix later.

dtype_dict = {
    # Table Name maps to dtypes (note that NaNs cannot encode to integers, so must do ex post facto fixes)
    'assets_earnings_investments':
        {'respondent_id':'int32',
         'asset_value':'float64', 'earnings_value':'float64', 'investment_value':'float64'},
    'customers_sales':
        {'respondent_id':'int32',
         'customers':'Int32',
         'sales':'float64', 'revenues':'float64'},
    'debt_equity_returns':
        {'respondent_id':'int32',
         'rate_base_actual':'float64', 'equity_actual':'float64', 'debt_actual':'float64', 
         'equity_ratio_actual':'float64', 'returns_actual':'float64', 'earnings_actual':'float64',
         'interest_actual':'float64', 'fed_tax_expense_actual':'float64',
         'pre_tax_net_income_actual':'float64', 'ROR_actual':'float64', 'ROE_actual':'float64',
         'equity_ratio':'float64', 'ROR':'float64', 'ROE':'float64',
         'interest_rate':'float64',
         'effective_fed_tax_rate':'float64', 'equity_authorized':'float64', 'debt_authorized':'float64',
         'returns_authorized':'float64', 'earnings_authorized':'float64', 'interest_authorized':'float64', 
         'interest_rate_authorized':'float64'},
    'emissions_targets':
        {'respondent_id':'Int32',
         'CO2_historical':'float64', 'CO2_target':'float64', 'CO2_target_all_years':'float64', 'CO2_1point5C':'float64',
         'generation_historical':'float64', 'generation_projected':'float64', 'generation_1point5C':'float64',
         'CO2_intensity_historical':'float64', 'CO2_intensity_target':'float64', 'CO2_intensity_target_all_years':'float64', 'CO2_intensity_1point5C':'float64'},
    'employees':
        {'respondent_id':'int32',
         'employees':'int32'},
    'expenditure_bills_burden':
        {'respondent_id':'int32',
         'expenditure':'float64', 'bill':'float64', 'burden':'float64'},
    'expenditure_bills_burden_detail': 'string',
    'housing_units_income':
        {'respondent_id':'int32',
         'housing_units':'float64', 'income':'float64'},
    'net_plant_balance':
        {'respondent_id':'int32',
         'original_cost':'float64', 'accum_depr':'float64', 'net_plant_balance':'float64',
         'ARC':'float64', 'ARC_accum_depr':'float64', 'net_ARC':'float64'},
    'operations_emissions_by_fuel':
        {'respondent_id':'int32', 'plant_id_eia':'Int32',
         'latitude':'float64', 'longitude':'float64',
         'operating_month':'Int32', 'operating_year':'Int32',
         'retirement_month':'Int32', 'retirement_year':'Int32',
         'generation':'float64', 'fuel_consumption':'float64',
         'emissions_c02':'float64', 'emissions_nox':'float64', 'emissions_sox':'float64'},
    'operations_emissions_by_tech':
        {'respondent_id':'int32', 'plant_id_eia':'Int32',
         'latitude':'float64', 'longitude':'float64',
         'capacity':'float64', 'year_end_capacity':'float64', 'generation':'float64', 'potential_generation':'float64',
         'capacity_factor':'float64', 'fuel_consumption':'float64',
         'emissions_c02':'float64', 'emissions_nox':'float64', 'emissions_sox':'float64'},
    'revenue_by_tech':
        {'respondent_id':'int32',
         'revenue_total':'float64', 'revenue_residential':'float64' },
    'state_targets': 'string',
    'utility_information':
        {'respondent_id':'int32', 'utility_id_eia':'Int32',
         'duplicate_utility_id_eia':'boolean' },
    'utility_state_map':
        {'respondent_id':'int32',
         'capacity_owned_in_state':'float64',
         'capacity_operated_in_state':'float64',
         'mwh_sales_in_state':'float64', }
}

fillna_dict = {
    'assets_earnings_investments':
        {'asset_value': 0, 'earnings_value': 0, 'investment_value': 0},
    'customer_sales':
        {'customers': 0, 'sales': 0, 'revenues': 0}
}

dropna_dict = {
    'emissions_targets': {'respondent_id':'int32'},
}

Create the actual metadata for the source.  In this case, it is *rmi_utility_transition_hub*

We read and interpret the data dictionary that comes with the data sources.

In [6]:
import io
import zipfile
import pandas as pd

if False:
    rmi_20210929_b = io.BytesIO(source_bucket.Object('RMI/RMI-20210929.zip').get()['Body'].read())
    rmi_20210929_zip = zipfile.ZipFile(rmi_20210929_b, mode='r')
    del(rmi_20210929_b)
    # display(zipfile.ZipFile(rmi_20210929_zip, mode='r').filelist)
    rmi_dd = rmi_20210929_zip.read('data_download/RMI Utility Transition Hub Data Dictionary.xlsx')

    # Read all the sheets.  rmi_excel['sheet_name'] gives a specific sheet
    rmi_20210929_xls = pd.read_excel(rmi_dd, sheet_name=None, dtype=str)

    rmi_20211120_b = io.BytesIO(source_bucket.Object('RMI/RMI-20211120.zip').get()['Body'].read())
    rmi_20211120_zip = zipfile.ZipFile(rmi_20211120_b, mode='r')
    del(rmi_20211120_b)
    # display(zipfile.ZipFile(rmi_20211120_zip, mode='r').filelist)
    rmi_dd = rmi_20211120_zip.read('data_dictionary.xlsx')

    # Read all the sheets.  rmi_excel['sheet_name'] gives a specific sheet
    rmi_20211120_xls = pd.read_excel(rmi_dd, sheet_name=None, dtype=str)

rmi_20220119_b = io.BytesIO(source_bucket.Object('RMI/RMI-20220119.zip').get()['Body'].read())
rmi_20220119_zip = zipfile.ZipFile(rmi_20220119_b, mode='r')
del(rmi_20220119_b)
# display(zipfile.ZipFile(rmi_20220119_zip, mode='r').filelist)
rmi_dd = rmi_20220119_zip.read('data_dictionary.xlsx')

# Read all the sheets.  rmi_excel['sheet_name'] gives a specific sheet
rmi_20220119_xls = pd.read_excel(rmi_dd, sheet_name=None, dtype=str)

del(rmi_dd)

In [7]:
def drop_cols_by_index(df, index):
    column_numbers = [x-1 for x in range(df.shape[1],0,-1)]  # reversed list of columns' integer indices
    if type(index)==list:
        index.sort()
    else:
        index = [ index ]
    for i in index:
        # removing small-to-large index values from large-to-small list means we take from the right, preserving order from zero origin
        column_numbers.remove(i)
    column_numbers.sort()
    return df.iloc[:, column_numbers] # return remaining columns

def generate_overview_meta(dd, release_date):
    global overview_dd
    print('Generating Overview...')
    title = dd.loc[0, 'Unnamed: 1']
    general_overview = dd.loc[1, 'Unnamed: 2']
    scope = dd.loc[2, 'Unnamed: 2']
    limitations_to_scope = dd.loc[3, 'Unnamed: 2']
    # Drop non-table data captured above
    dd = dd.drop(list(range(0,5)))
    dd.set_axis(['Name', 'Description'], axis=1,inplace=True)
    meta_content = {'title':title, 'description':general_overview, 'version':f'Released {release_date}', 'uri':'https://utilitytransitionhub.rmi.org/data-download/',
                    'copyright':'© 2021 RMI', 'license':'Creative Commons Attribution-Noncommercial 4.0 International Public License (CC BY-NC)',
                    'contact':'utilitytransitionhub@rmi.org', 'abstract':'\n'.join([scope, limitations_to_scope]), 'name':'rmi_utility_transition_hub' }
    overview_dd = dd
    return meta_content

In [8]:
import math

units_dict = {
    '$': 'USD',
    'number': '',
}

def generate_generic_meta(sheet, dd):
    global overview_dd
    
    description = overview_dd.loc[overview_dd['Name']==sheet, 'Description'].to_string(index=False)
    meta_content = { 'tname':sheet, 'parent_schema':schemaname, 'description':description,
                     dd.iloc[-2,0]: dd.iloc[-1,0]}
    dd.columns = list(dd.iloc[0])
    dd = dd.drop(0, axis=0).drop(dd.tail(2).index).fillna(value='')
    meta_fields = {k:{'description': v} for k, v in list(zip(dd.iloc[:, 0], dd.iloc[:, 1]))}
    for field, dim in list(zip(dd.iloc[:, 0], dd.iloc[:, 2])):
        if dim=='':
            continue
        if dim in units_dict:
            meta_fields[field]['dimension'] = units_dict[dim]
        else:
            meta_fields[field]['dimension'] = dim

    return meta_fields, meta_content

In [9]:
def generate_assets_meta(sheet, dd):
    global overview_dd
    description = overview_dd.loc[overview_dd['Name']==sheet, 'Description'].to_string(index=False)
    meta_content = { 'tname':sheet, 'parent_schema':schemaname, 'description':description,
                     dd.iloc[-2,0]: dd.iloc[-1,0]}
    dd.columns = list(dd.iloc[0])
    dd = dd.drop(0, axis=0).drop(dd.tail(2).index)
    
    ### ??? What should we do with enumerations of asset and sub_asset types?
    
    # Drop columns that exist only to hold enumeration values
    dd = drop_cols_by_index(dd, [1,2])
    # Asset & sub_asset are actually two fields
    fixup_index = dd[dd['Data field']=='asset & sub_asset'].index
    dd.loc[fixup_index, 'Data field'] = 'asset'
    dd.loc[fixup_index, 'Definition'] = "RMI's categorization of assets based on grouping of [steam,nuclear,hydro,renewables,other_fossil,transmission,distribution,other]"
    dd.loc[fixup_index+1, 'Data field'] = 'sub_asset'
    dd.loc[fixup_index+1, 'Definition'] = "RMI's categorization of sub_assets when asset=other based on grouping of [AROs,construction_work_in_progress,distribution_arc,electric_plant_held_for_future_use,electric_plant_leased_to_others,experimental_plant,general_plant,general_plant_arc,hydro_arc,intangible_plant,net_ADIT,net_regulatory_assets,net_working_capital,nuclear_arc,other_deferred_debits_and_credits,other_electric_plant,other_fossil_arc,other_noncurrent_liabilities,regional_transmission_and_market_operation,renewables_arc,steam_arc,transmission_arc]"
    dd = dd.dropna(subset=['Data field']).fillna(value='')
    
    meta_fields = {k:{'description': v} for k, v in list(zip(dd.iloc[:, 0], dd.iloc[:, 1]))}
    for field, dim in list(zip(dd.iloc[:, 0], dd.iloc[:, 2])):
        if dim=='':
            continue
        if dim in units_dict:
            meta_fields[field]['dimension'] = units_dict[dim]
        else:
            meta_fields[field]['dimension'] = dim
    return meta_fields, meta_content

In [10]:
def generate_emissions_meta(sheet, dd):
    global overview_dd
    global operations_emissions_fields, operations_emissions_content
    
    description = overview_dd.loc[overview_dd['Name']==sheet, 'Description'].to_string(index=False)
    meta_content = { 'tname':sheet, 'parent_schema':schemaname, 'description':description,
                     dd.iloc[-2,0]: dd.iloc[-1,0]}
    dd.columns = list(dd.iloc[0])
    dd = dd.drop(0, axis=0).drop(dd.tail(2).index)
    
    ### ??? What should we do with code enumerations?
    
    # Drop columns that exist only to hold enumeration values
    dd = drop_cols_by_index(dd, 1)
    # Strip out enumerations of codes
    dd = dd.dropna(subset=['Data field']).fillna(value='')
    meta_fields = {k:{'description': v} for k, v in list(zip(dd.iloc[:, 0], dd.iloc[:, 1]))}
    
    for field, dim in list(zip(dd.iloc[:, 0], dd.iloc[:, 2])):
        if dim=='':
            continue
        if dim in units_dict:
            meta_fields[field]['dimension'] = units_dict[dim]
        else:
            meta_fields[field]['dimension'] = dim
    operations_emissions_fields = meta_fields
    operations_emissions_content = meta_content
    return meta_fields, meta_content

In [11]:
def generate_bills_meta(sheet, dd):
    global overview_dd
    description = overview_dd.loc[overview_dd['Name']==sheet, 'Description'].to_string(index=False)
    meta_content = { 'tname':sheet, 'parent_schema':schemaname, 'description':description,
                     dd.iloc[-2,0]: dd.iloc[-1,0]}
    dd.columns = list(dd.iloc[0])
    # Temporary fix for bad November 2020 data
    if sheet=='revenue_by_tech':
        dd = dd.rename(columns={' revenue_total ':'revenue_total', ' revenue_residential ':'revenue_residential'})
    dd = dd.drop(0, axis=0).drop(dd.tail(2).index)
    
    ### ??? What should we do with code enumerations?
    
    # Drop columns that exist only to hold enumeration values
    dd = drop_cols_by_index(dd, 1)
    # Strip out enumerations of codes
    dd = dd.dropna(subset=['Data field']).fillna(value='')
    meta_fields = {k:{'description': v} for k, v in list(zip(dd.iloc[:, 0], dd.iloc[:, 1]))}
    for field, dim in list(zip(dd.iloc[:, 0], dd.iloc[:, 2])):
        if dim=='':
            continue
        if dim in units_dict:
            meta_fields[field]['dimension'] = units_dict[dim]
        else:
            meta_fields[field]['dimension'] = dim
    return meta_fields, meta_content

In [12]:
def generate_other_generation_meta(dd):
    global operations_emissions_fields
    meta_content = { 'tname':'other_generation', 'parent_schema':schemaname,
                     'description': 'EE & DR as well as Purchased Power, all of which count toward `avoided generation`',
                     'Additional notes': 'Table derived from operation_emissions_by_field by OS-Climate'}
    meta_fields = { ik: operations_emissions_fields[ik] for ik in (operations_emissions_fields.keys() & list(dd.columns))}
    return meta_fields, meta_content

The Data Dictionary is in an XLSX workbook
The actual Data lives in seprate CSV files

In [13]:
other_generation_meta_fields = None
other_generation_meta_content = None
other_generation_df = None

def generate_sheet_meta(wb, sheet, release_date):
    # For as-yet unexplained reasons, RMI renamed the file assets_earnings_investments.csv
    # without updating the name of the sheet in the spreadsheet.  We switch to the intended sheetname here
    if sheet == 'assets_earnings':
        sheet = 'assets_earnings_investments'
    
    dd=wb[sheet]
    # Remove empty column that appears in all of the spreadsheets
    dd = dd.drop('Unnamed: 0', axis=1)
    # Given the requested sheet the necessary processing
    if sheet=='Overview':
        meta_content = generate_overview_meta(dd, release_date)
        print('Metadata Overview')
        print(meta_content)
        return {}, meta_content
    if sheet=='data_sources':
        return {}, {}
    if sheet=='assets_earnings_investments':
        return generate_assets_meta(sheet, dd)
    if sheet in ['employees', 'expenditure_bills_burden', 'expenditure_bills_burden_detail', 'revenue_by_tech']:
        # Both tables have the same essential shape
        return generate_bills_meta(sheet, dd)
    if sheet in ['operations_emissions_by_fuel', 'operations_emissions_by_tech']:
        return generate_emissions_meta(sheet, dd)
    if sheet=='other_generation':
        return generate_other_generation_meta(dd)
    if sheet=='state_utility_policies':
        ### df = pd.read_csv(source_bucket.Object(f).get()['Body'],
        ###                  dtype={'respondent_id':'int32'},parse_dates=['date_updated'],dayfirst=True)
        return generate_generic_meta(sheet, dd)
    return generate_generic_meta(sheet, dd)

In [14]:
from io import BytesIO
import datetime

rmi_ingest_schemas = { # 'rmi_20210929': ('September 2021', rmi_20210929_xls, rmi_20210929_zip),
                       # 'rmi_20211120': ('November 2021', rmi_20211120_xls, rmi_20211120_zip),
                       'sandbox': ('January 2022', rmi_20220119_xls, rmi_20220119_zip),}

def first_valid_col_value(df, col):
    return df.loc[df[col].first_valid_index(), col]

# There is no datafile behind the data dictionary.  Run this to prime overview_dd, which all other metadata-finding depends upon
for schemaname, (release_date, workbook, zipfile) in rmi_ingest_schemas.items():
    # The dataframe for Non-Null data
    df_nn = None
    overview_dd = None
    operations_emissions_content = None
    operations_emissions_fields = None

    overview_meta_fields, overview_meta_content = generate_sheet_meta(workbook, 'Overview', release_date)

    for zipinfo in zipfile.infolist():
        fname = zipinfo.filename
        ftimestamp = datetime.datetime(*zipinfo.date_time)
        if fname[-4:] != '.csv':
            continue
        if fname.startswith('__MACOSX/._'):
            continue

        tablename = fname.split('/')[-1].split('.')[0]
        # For as-yet unexplained reasons, RMI renamed the file assets_earnings_investments.csv
        # without updating the name of the sheet in the spreadsheet.  We switch to the intended name here
        if tablename == 'assets_earnings':
            tablename = 'assets_earnings_investments'
        if tablename == 'expenditure_bills_burden_detail':
            continue

        print(f"fname: '{fname}'; tablename = '{tablename}'")
        with zipfile.open(fname) as zf:
            if tablename=='state_utility_policies':
                df = pd.read_csv(zf, dtype={'respondent_id':'int32'},parse_dates=['date_updated'],dayfirst=True, engine='c')
            elif tablename.startswith('utility'):
                df = pd.read_csv(zf, dtype=dtype_dict[tablename], engine='c')
                if tablename=='utility_information' and schemaname <= 'rmi_20211120':
                    # Correct information for 'American Transmission Co LLC', which is owned by 'WEC Energy Group'
                    df.loc[df.respondent_id==275, ['parent_name', 'parent_ticker', 'parent_ISIN', 'parent_LEI']] = df.loc[df.respondent_id==519, ['parent_name', 'parent_ticker', 'parent_ISIN', 'parent_LEI']].values
                    df = df.rename(columns={'parent_ISIN':'parent_isin', 'parent_LEI':'parent_lei'})
                if tablename=='utility_information':
                    # Correct several LEI errors and omissions
                    df.loc[df.parent_name=='American Electric Power Co., Inc.', 'parent_lei'] = '1B4S6S7G0TW5EE83BO58'
                    df.loc[df.parent_name=='American States Water Co.', 'parent_isin'] = first_valid_col_value(df[df.parent_name=='American States Water Co.'], 'parent_isin')
                    df.loc[df.parent_name=='American States Water Co.', 'parent_lei'] = '529900L26LIS2V8PWM23'
                    df.loc[df.parent_name=='Berkshire Hathaway, Inc.', 'parent_lei'] = '5493000C01ZX7D35SD85'
                    df.loc[df.parent_name=='Citizens Energy Corp.', 'parent_lei'] = '5493008ORX814MK1WM19'
                    df.loc[df.parent_name=='FirstEnergy Corp.', 'parent_lei'] = '549300SVYJS666PQJH88'
                    df.loc[df.parent_name=='LS Power', 'parent_lei'] = '549300Z88AAE0R1YHI77'
                    df.loc[df.parent_name=='NextEra Energy, Inc.', 'parent_lei'] = 'UMI46YPGBLUE4VGNNT48'
                    df.loc[df.parent_name=='PG&E Corp.', 'parent_lei'] = '8YQ2GSDWYZXO2EDN3511'
                    df.loc[df.parent_name=='Sempra', 'parent_isin'] = first_valid_col_value(df[df.parent_name=='Sempra'], 'parent_isin')
                    df.loc[df.parent_name=='Sempra', 'parent_lei'] = 'PBBKGKLRK5S5C0Y4T545'
                    df.loc[df.parent_name=='Unitil Corp.', 'parent_lei'] = '549300EYGHO5EZE7RL80'
                    df.loc[df.parent_name=='Verso Corp.', 'parent_lei'] = '549300FODXCTQ8DGT594'
                    df.loc[df.parent_name=='Verso Corp.', 'parent_isin'] = 'US92531L2079'
            elif tablename=='state_targets':
                df = pd.read_csv(zf, dtype=dtype_dict[tablename], engine='c')
                df.year.fillna('-1', inplace=True)
                df.year = df.year.astype('string')
                df.loc[df.year.isin(['Annual','2005/1990']), 'year'] = '-1'
                df.year = pd.to_datetime(df.year.map(lambda x: x.split('.')[0]).astype('int32'), format='%Y', errors='coerce')
            else:
                df = pd.read_csv(zf, dtype=dtype_dict[tablename], thousands=',', engine='c')
                if 'year' in df.columns:
                    df.year.fillna('-1', inplace=True)
                    df.year = df.year.astype('string')
                    df.year = pd.to_datetime(df.year.map(lambda x: x.split('.')[0]).astype('int32'), format='%Y', errors='coerce')
                if tablename=='revenue_by_tech':
                    # The 2020 numbers come with extra spaces, comma separators, and sometimes parentheses instead of minus signs.
                    def cleanup_2020_numbers(s):
                        if type(s)==float:
                            return s
                        if s is None or s=='' or pd.isnull(s):
                            return 'nan'
                        s = s.strip().replace(',','')
                        if s[0]=='(':
                            s = s[1:-1]
                        elif s=='#NAME?':
                            s = 'nan'
                        return s
                    df.rename(columns={' revenue_total ':'revenue_total', ' revenue_residential ':'revenue_residential'}, inplace=True)
                    df.revenue_total = df.revenue_total.map(cleanup_2020_numbers).astype('float64')
                    df.revenue_residential = df.revenue_residential.map(cleanup_2020_numbers).astype('float64')
            df = df.convert_dtypes(infer_objects=False, convert_string=True, convert_integer=False, convert_boolean=False, convert_floating=False)
            df.info(verbose=True)

        if tablename in dropna_dict:
            df.dropna(subset=list(dropna_dict[tablename].keys()), inplace=True)
        if tablename in fillna_dict:
            df.fillna(value=fillna_dict[tablename], inplace=True)

        custom_meta_fields, custom_meta_content = generate_sheet_meta(workbook, tablename, release_date)
        if tablename in ['operations_emissions_by_fuel', 'operations_emissions_by_tech']:
            # Both tables duplicate the 'Purchased Power' and 'EE & DR' data.
            # We only need one copy, which we create as 'other_generation'
            if df_nn is None:
                df_anon = df.loc[df['plant_name_eia'].isna()].copy()
                # Drop many NULL columns we don't need
                df_anon.dropna(axis=1, how='all', inplace=True)
                other_generation_df = df_anon
                custom_gen_meta_fields, custom_gen_meta_content = generate_other_generation_meta(df_anon)
                
                ingest_table = 'other_generation'
                columnschema = osc.create_table_schema_pairs(df_anon,
                                                             typemap={"datetime64[ns]":"timestamp(6)"})

                tabledef = f"""
create table if not exists {ingest_catalog}.{ingest_schema}.{ingest_table}(
{columnschema}
) with (
    format = 'ORC',
    partitioning = array['year']
)
"""
                print(tabledef)
                qres = engine.execute(tabledef)
                print(qres.fetchall())
                df_anon.to_sql(ingest_table,
                               con=engine, schema=ingest_schema, if_exists='append',
                               index=False,
                               method=osc.TrinoBatchInsert(batch_size = 2000, verbose = False))
                df_nn = df.loc[~df['operating_month'].isna()]
            df.dropna(subset=['plant_name_eia'], inplace=True)
            # For some reason, data before 2010 is sometimes not filled in.
            for index, row in df[df['operating_month'].isna()].iterrows():
                # df_nn is only computed once, from either 'operations_emissions_by_fuel' or 'operations_emissions_by_tech'
                df0 = df_nn.loc[(df_nn['respondent_id']==row['respondent_id']) & (df_nn['generator_id']==row['generator_id']), ['operating_month', 'operating_year']]
                if len(df0)==0:
                    # In this case we have no prior data to refer to
                    continue
                om, oy = df0.iloc[0]
                df.loc[index, ('operating_month', 'operating_year')] = om, oy
            # ICEBERG does not support integers less than 32 bits
            # for colname in ['operating_month', 'operating_year', 'retirement_month', 'retirement_year']:
            #     df[colname] = pd.to_numeric(df[colname],downcast='integer')
            #     pass
        elif tablename=='emissions_targets':
            # Needed because respondent_id 191 data is duplicated (and 121 is kinda duplicated, too).
            df.drop_duplicates(subset=None, keep='first', inplace=True, ignore_index=True)
            df.loc[df['CO2_intensity_historical']==np.inf, ['CO2_historical', 'CO2_intensity_historical']] = [0, 0]
            df['CO2_intensity_historical'] = df['CO2_intensity_historical'].astype('float64')
            
            # Fix discrepencies between emissions_targets and the other files that define/reference parent_name
            df.loc[df.parent_name=='American States Water', 'parent_name'] = 'American States Water Co.'
            df.loc[df.parent_name=='CMS Energy', 'parent_name'] = 'CMS Energy Corp.'
            df.loc[df.parent_name=='Emera Inc.', 'parent_name'] = 'Versant Power'
            df.loc[df.parent_name=='Fortis, Inc', 'parent_name'] = 'Fortis, Inc.'
            df.loc[df.parent_name=='National Grid plc', 'parent_name'] = 'National Grid PLC'
            df.loc[df.parent_name=='NorthWestern Corp.', 'parent_name'] = 'Northwestern Corp.'
            df.loc[df.parent_name=='OG&E Energy', 'parent_name'] = 'OG&E Energy Corp.'
            df.loc[df.parent_name=='PPL', 'parent_name'] = 'PPL Corp.'
            df.loc[df.parent_name=='Sempra Energy', 'parent_name'] = 'Sempra'
            df.loc[df.parent_name=='Verso', 'parent_name'] = 'Verso Corp.'
        # if tablename in tidy_dict:
        #     tidy_df = df.melt(id_vars=tidy_dict[tablename][0], value_vars=tidy_dict[tablename][2],
        #                       var_name=tidy_dict[tablename][1][0], value_name=tidy_dict[tablename][1][1])
        #     tidy_df[tidy_dict[tablename][1][0]] = tidy_df[tidy_dict[tablename][1][0]].apply(lambda x: x.split('_')[0])
        #     tidy_df.dropna(subset=[tidy_dict[tablename][1][1]],inplace=True)
        
        ingest_table = tablename
        columnschema = osc.create_table_schema_pairs(df, typemap={"datetime64[ns]":"timestamp(6)"})

        if 'year' in df.columns:
            partition = 'year'
        elif 'first_report_year' in df.columns:
            partition = 'first_report_year'
        elif 'state' in df.columns:
            partition = 'state'
        else:
            raise KeyError

        tabledef = f"""
create table if not exists {ingest_catalog}.{ingest_schema}.{ingest_table}(
{columnschema}
) with (
    format = 'ORC',
    partitioning = array['{partition}']
)
"""
        print(tabledef)
        qres = engine.execute(tabledef)
        print(qres.fetchall())
        df.to_sql(ingest_table,
                  con=engine, schema=ingest_schema, if_exists='append',
                  index=False,
                  method=osc.TrinoBatchInsert(batch_size = 2000, verbose = False))
    zipfile.close()

print('Done!')

Generating Overview...
Metadata Overview
{'title': 'RMI Utility Transition Hub Data Dictionary', 'description': 'The RMI Utility Transition Hub Data Download is a collection of publicly available data, organized and used to calculate key metrics that describe the US utility transition.\n\nThis data dictionary describes each data file in detail, including definitions, units, data sources, and methodology.\n\nFor downloadable data, visit https://utilitytransitionhub.rmi.org/data-download/.\nFor interactive data visualizations, visit https://utilitytransitionhub.rmi.org/portal/.\nFor analyses and insights, visit https://utilitytransitionhub.rmi.org/insights/.', 'version': 'Released January 2022', 'uri': 'https://utilitytransitionhub.rmi.org/data-download/', 'copyright': '© 2021 RMI', 'license': 'Creative Commons Attribution-Noncommercial 4.0 International Public License (CC BY-NC)', 'contact': 'utilitytransitionhub@rmi.org', 'abstract': 'Utilities coverage: all current FERC Form 1 respond

Iterate through tablenames until we get to *Additional Information*, storing all the names and descriptions of the tables.

We've already collected the field names, types, dimensions, etc., of each table into our dataframes.
But there's additional metadata we can collect from the data dictionary

## Load metadata following an ingestion process into trino metadata store

### The schema is *metastore*, and the table names are *meta_schema*, *meta_table*, *meta_field*