In [1]:
import os
import pathlib
from dotenv import load_dotenv
import boto3
import json
import warnings

import pandas as pd
import numpy as np
import osc_ingest_trino as osc
import trino
from sqlalchemy.engine import create_engine

import ITR

from ITR.configs import ITR_median, ITR_mean
from ITR.data.data_warehouse import DataWarehouse
from ITR.portfolio_aggregation import PortfolioAggregationMethod
from ITR.temperature_score import TemperatureScore

from ITR.data.base_providers import BaseProviderProductionBenchmark, BaseProviderIntensityBenchmark
from ITR.data.template import TemplateProviderCompany
from ITR.interfaces import EScope, ETimeFrames, EScoreResultType, IEIBenchmarkScopes, IProductionBenchmarkScopes, ProjectionControls
# from ITR.configs import LoggingConfig

from ITR.data.osc_units import ureg, Q_, PA_, asPintSeries
from pint import Quantity
from pint_pandas import PintArray, PintType

import logging

  for label, val in metric_conversion.iteritems():


In [2]:
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # LoggingConfig.FORMAT
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

logger.info("Start!")

examples_dir ='' #'examples'
data_dir="data"
data_json_units_dir="json-units"
root = os.path.abspath('')

company_data_path = os.path.join(root, examples_dir, data_dir, "20230106 ITR V2 Sample Data.xlsx")

# Production benchmark (there's only one, and we have to stretch it from OECM to cover TPI)
benchmark_prod_json_file = "benchmark_production_OECM.json"
benchmark_prod_json = os.path.join(root, examples_dir, data_dir, data_json_units_dir, benchmark_prod_json_file)
with open(benchmark_prod_json) as json_file:
    parsed_json = json.load(json_file)

oil_prod_bm = [bm for bm in parsed_json['AnyScope']['benchmarks'] if bm['sector']=='Oil'][0]
oil_base_prod = ureg(oil_prod_bm['base_year_production']).to('MJ')
oil_prod_series = pd.DataFrame({**{'year': [oil_prod['year'] for oil_prod in oil_prod_bm['projections_nounits']]},
                                **{'value': [oil_prod['value'] for oil_prod in oil_prod_bm['projections_nounits']]}}).set_index('year').squeeze()
oil_prod_series = oil_prod_series.add(1).cumprod().mul(oil_base_prod.m)

gas_prod_bm = [bm for bm in parsed_json['AnyScope']['benchmarks'] if bm['sector']=='Gas'][0]
gas_base_prod = ureg(gas_prod_bm['base_year_production']).to('MJ')
gas_prod_series = pd.DataFrame({**{'year': [gas_prod['year'] for gas_prod in gas_prod_bm['projections_nounits']]},
                                **{'value': [gas_prod['value'] for gas_prod in gas_prod_bm['projections_nounits']]}}).set_index('year').squeeze()
gas_prod_series = gas_prod_series.add(1).cumprod().mul(gas_base_prod.m)

oil_and_gas_prod_series = oil_prod_series.add(gas_prod_series).div(oil_base_prod.m + gas_base_prod.m)
oil_and_gas_prod_series = oil_and_gas_prod_series.div(oil_and_gas_prod_series.shift(1))
oil_and_gas_prod_series.iloc[0] = 1.0
oil_and_gas_prod_series = oil_and_gas_prod_series.sub(1)

oil_and_gas_bm = dict(oil_prod_bm)
oil_and_gas_bm['sector'] = 'Oil & Gas'
oil_and_gas_bm['base_year_production'] = f"{oil_base_prod + gas_base_prod:~P}"
oil_and_gas_bm['projections_nounits'] = [ {'year': year, 'value': value} for year,value in oil_and_gas_prod_series.to_dict().items() ]
parsed_json['AnyScope']['benchmarks'].append(oil_and_gas_bm)

# coal_prod_bm = dict([bm for bm in parsed_json['AnyScope']['benchmarks'] if bm['sector']=='Coal'][0])
# coal_base_prod = ureg(coal_prod_bm['base_year_production']).to('MJ')

# coal_prod_bm['sector'] = 'Diversified Mining'
# coal_prod_bm['base_year_production'] = f"{coal_base_prod:~P}"
# parsed_json['AnyScope']['benchmarks'].append(coal_prod_bm)

prod_bms = IProductionBenchmarkScopes.parse_obj(parsed_json)
base_production_bm = BaseProviderProductionBenchmark(production_benchmarks=prod_bms)
logger.info('Load production benchmark from {}'.format(benchmark_prod_json_file))

# Emission intensities
benchmark_EI_OECM_PC_file = "benchmark_EI_OECM_PC.json"
benchmark_EI_OECM_S3_file = "benchmark_EI_OECM_S3.json"
benchmark_EI_OECM_file = "benchmark_EI_OECM.json" # Deprecated!
benchmark_EI_TPI_15_file = "benchmark_EI_TPI_1_5_degrees.json"
benchmark_EI_TPI_file = "benchmark_EI_TPI_2_degrees.json"
benchmark_EI_TPI_below_2_file = "benchmark_EI_TPI_below_2_degrees.json"
benchmark_EI_TPI_2deg_high_efficiency_file = "benchmark_EI_TPI_2_degrees_high_efficiency.json"
benchmark_EI_TPI_2deg_shift_improve_file = "benchmark_EI_TPI_2_degrees_shift_improve.json"

# loading dummy portfolio
df_portfolio = pd.read_excel(company_data_path, sheet_name="Portfolio")
companies = ITR.utils.dataframe_to_portfolio(df_portfolio)
logger.info('Load dummy portfolio from {}. You could upload your own portfolio using the template.'.format(company_data_path))

2023-03-13 12:09:52,697 - __main__ - INFO - Start!
2023-03-13 12:09:53,077 - __main__ - INFO - Load production benchmark from benchmark_production_OECM.json
2023-03-13 12:09:53,312 - __main__ - INFO - Load dummy portfolio from /opt/app-root/src/ITR/examples/data/20230106 ITR V2 Sample Data.xlsx. You could upload your own portfolio using the template.


In [3]:
eibm = 'OECM_S3'

if eibm == 'OECM_PC':
    benchmark_file = benchmark_EI_OECM_PC_file
elif eibm == 'OECM_S3':
    benchmark_file = benchmark_EI_OECM_S3_file
elif eibm.startswith('TPI_2_degrees'):
    benchmark_file = benchmark_EI_TPI_file
elif eibm == 'TPI_15_degrees':
    benchmark_file = benchmark_EI_TPI_15_file
elif eibm == 'OECM':
    benchmark_file = benchmark_EI_OECM_file
    logger.info('OECM benchmark is for backward compatibility only.  Use OECM_PC instead.')
else:
    benchmark_file = benchmark_EI_TPI_below_2_file
# load intensity benchmarks
benchmark_EI = os.path.join(root, examples_dir, data_dir, data_json_units_dir, benchmark_file)
with open(benchmark_EI) as json_file:
    parsed_json = json.load(json_file)
if eibm.startswith('TPI_2_degrees'):
    extra_EI = os.path.join(root, examples_dir, data_dir, data_json_units_dir,
                            benchmark_EI_TPI_2deg_high_efficiency_file if '_high_efficiency' in eibm
                            else benchmark_EI_TPI_2deg_shift_improve_file)
    with open(extra_EI) as json_file:
        extra_json = json.load(json_file)
        for scope_name in EScope.get_scopes():
            if scope_name in extra_json:
                if scope_name not in parsed_json:
                    parsed_json[scope_name] = extra_json[scope_name]
                else:
                    parsed_json[scope_name]['benchmarks'] += extra_json[scope_name]['benchmarks']
EI_bm = BaseProviderIntensityBenchmark(EI_benchmarks=IEIBenchmarkScopes.parse_obj(parsed_json))

In [4]:
template_company_data = TemplateProviderCompany(company_data_path, projection_controls = ProjectionControls())
Warehouse = DataWarehouse(template_company_data, benchmark_projected_production=None, benchmarks_projected_ei=None,
                          estimate_missing_data=DataWarehouse.estimate_missing_s3_data)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.rename(
2023-03-13 12:09:58,760 - ITR.data.template - ERROR - The following companies have ESG data defined but no fundamental data and will be removed from further analysis:
['US21037T1097' 'MYL3794OO004']
            (              'Ag Chem', 'NL0000009827',   'S2'),
            (    'Consumer Products', 'DE000SYM9999', 'S1S2'),
            ('Electricity Utilities', 'US0255371017',   'S2'),
            ('Electricity Utilities', 'US18551QAA58', 'S1S2'),
            ('Electricity Utilities', 'US18551QAA58',   'S2'),
            ('Electricity Utilities', 'US29364G1031',   'S1'),
            ('Electricity Utilities', 'US29364G1031',   'S2'),
            (               'Energy', 'PETRONAS_SOE',   'S1'),
            (               'Energy', 'PETRONAS_SOE',   'S2'),
            (  

In [5]:
# This updates benchmarks and all that depends on them (including trajectories)
Warehouse.update_benchmarks(base_production_bm, EI_bm)

2023-03-13 12:10:20,894 - ITR.data.data_warehouse - INFO - new_production_bm calculating trajectories for 112 companies (times 5 scopes times 31 years)
2023-03-13 12:10:47,327 - ITR.data.base_providers - INFO - Normalizing intensity metrics
2023-03-13 12:10:47,350 - ITR.data.base_providers - ERROR - intensity values for company US6745991058-chem not compatible with benchmark (CO2e * kilogram / USD)
2023-03-13 12:10:47,788 - ITR.data.base_providers - INFO - Done normalizing intensity metrics
2023-03-13 12:10:47,788 - ITR.data.data_warehouse - INFO - Allocating emissions to align with benchmark data
2023-03-13 12:10:47,795 - ITR.data.data_warehouse - INFO - Already allocated emissions for CA87807B1076 across ['Electricity Utilities', 'Gas', 'Oil']
2023-03-13 12:10:47,816 - ITR.data.data_warehouse - INFO - Sector alignment complete
2023-03-13 12:10:47,817 - ITR.data.data_warehouse - INFO - estimating missing data
2023-03-13 12:10:47,823 - ITR.data.data_warehouse - ERROR - Company US674599

In [6]:
# Load some standard environment variables from a dot-env file, if it exists.
# If no such file can be found, does not fail, and so allows these environment vars to
# be populated in some other way
dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src'))
dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env'
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path,override=True)

In [7]:
print("Initializing Dev tables")

sqlstring = 'trino://{user}@{host}:{port}/'.format(
    user = os.environ['TRINO_USER_USER1'],
    host = os.environ['TRINO_HOST'],
    port = os.environ['TRINO_PORT']
)

ingest_catalog = 'osc_datacommons_dev'
ingest_schema = 'demo_dv'
itr_prefix = 'itr_'

sqlargs = {
    'auth': trino.auth.JWTAuthentication(os.environ['TRINO_PASSWD_USER1']),
    'http_scheme': 'https',
    'catalog': ingest_catalog,
    'schema': ingest_schema,
}

dev_engine = create_engine(sqlstring, connect_args = sqlargs)
print("connecting with engine " + str(dev_engine))
qres = osc._do_sql(f"show tables in {ingest_schema}", dev_engine, verbose=True)

Initializing Dev tables
connecting with engine Engine(trino://os-climate-user1@trino-secure-odh-trino.apps.odh-cl2.apps.os-climate.org:443/)
show tables in demo_dv
[('itr_benchmark_ei',), ('itr_benchmark_prod',), ('itr_company_data',), ('itr_cumulative_budget_1',), ('itr_cumulative_emissions',), ('itr_emissions_data',), ('itr_fundamental_data',), ('itr_overshoot_ratios',), ('itr_production_data',), ('itr_target_data',), ('itr_temperature_scores',), ('itr_trajectory_data',)]


In [8]:
# bucket must be configured with credentials for trino, and accessible to the hive catalog
# You may need to use a different prefix here depending on how you name your credentials.env variables
hive_bucket = osc.attach_s3_bucket('S3_HIVE')

hive_catalog = 'osc_datacommons_hive_ingest'
hive_schema = 'ingest'

# This will have identity of main notebook user, not OS-Climate-User1
ingest_engine = osc.attach_trino_engine(verbose=True, catalog=ingest_catalog, schema=ingest_schema)

using connect string: trino://MichaelTiemannOSC@trino-secure-odh-trino.apps.odh-cl2.apps.os-climate.org:443/osc_datacommons_dev/demo_dv


In [9]:
# If DF_COL contains Pint quantities (because it is a PintArray or an array of Pint Quantities),
# return a two-column dataframe of magnitudes and units.
# If DF_COL contains no Pint quanities, return it unchanged.

def dequantify_column(df_col: pd.Series) -> pd.DataFrame:
    if type(df_col.values)==PintArray:
        return pd.DataFrame({df_col.name: df_col.values.quantity.m,
                             df_col.name + "_units": str(df_col.values.dtype.units)},
                            index=df_col.index).convert_dtypes()
    elif df_col.size==0:
        return df_col
    elif df_col.map(lambda x: isinstance(x, Quantity)).any():
        values = df_col.map(lambda x: (x.m, x.u) if isinstance(x, Quantity) else x)
        return pd.DataFrame({df_col.name: df_col.map(lambda x: x.m if isinstance(x, Quantity) else x),
                             str(df_col.name) + "_units": df_col.map(lambda x: str(x.u) if isinstance(x, Quantity) else None)},
                            index=df_col.index).convert_dtypes()
    else:
        return df_col

# Rewrite dataframe DF so that columns containing Pint quantities are represented by a column for the Magnitude and column for the Units.
# The magnitude column retains the original column name and the units column is renamed with a _units suffix.
def dequantify_df(df: pd.DataFrame) -> pd.DataFrame:
    return pd.concat([dequantify_column(df[col]) for col in df.columns], axis=1)

In [10]:
# Because this DF comes from reading a Trino table, and because columns must be unqiue, we don't have to enumerate to ensure we properly handle columns with duplicated names

def requantify_df(df: pd.DataFrame) -> pd.DataFrame:
    units_col = None
    columns_reversed = reversed(df.columns)
    for col in columns_reversed:
        if col.endswith("_units"):
            if units_col:
                # We expect _units column to follow a non-units column
                raise ValueError
            units_col = col
            continue
        if units_col:
            if str(col) + '_units' != units_col:
                raise ValueError
            if (df[units_col]==df[units_col][0]).all():
                # Make a PintArray
                new_col = PA_(df[col], dtype=f"pint[{df[units_col][0]}]")
            else:
                # Make a pd.Series of Quantity in a way that does not throw UnitStrippedWarning
                new_col = pd.Series(data=df[col], name=col) * pd.Series(data=df[units_col].map(lambda x: ureg(x).u), name=col)
            df = df.drop(columns=units_col)
            df[col] = new_col
            units_col = None
    return df

## ITR Company Data

In [11]:
df = template_company_data.df_fundamentals[[
    'company_name', 'company_lei', 'company_id',
    'sector', 'country', 'region', 'exposure', 'currency',
    'report_date',
    'company_market_cap', 'company_revenue',
    'company_enterprise_value', 'company_ev_plus_cash',
    'company_total_assets',
    'cash',
    'debt' ]].convert_dtypes()

df.cash = df.cash.astype("Float64")
df.debt = df.debt.astype("Float64")
df.rename(columns={'company_enterprise_value':'company_ev', 'company_ev_plus_cash':'company_evic', 'cash':'company_cash_equivalents', 'debt':'company_debt'}, inplace=True)
df['year'] = df.report_date.dt.year
df.drop(columns='report_date', inplace=True)

schema = osc.create_table_schema_pairs(df)

company_tablename = f"{itr_prefix}company_data"

osc._do_sql(f"drop table if exists {ingest_catalog}.{ingest_schema}.{company_tablename}", ingest_engine, verbose=True)


tabledef = f"""
create table if not exists {ingest_catalog}.{ingest_schema}.{company_tablename}(
{schema}
) with (
    format = 'ORC',
    partitioning = ARRAY['year']
)
"""

osc._do_sql(tabledef, ingest_engine, verbose=True)

df.to_sql(company_tablename, ingest_engine, schema=ingest_schema, if_exists="append",
          index=False,
          method=osc.TrinoBatchInsert(batch_size = 1000, verbose = True))

drop table if exists osc_datacommons_dev.demo_dv.itr_company_data

create table if not exists osc_datacommons_dev.demo_dv.itr_company_data(
    company_name varchar,
    company_lei varchar,
    company_id varchar,
    sector varchar,
    country varchar,
    region varchar,
    exposure varchar,
    currency varchar,
    company_market_cap double,
    company_revenue double,
    company_ev double,
    company_evic double,
    company_total_assets double,
    company_cash_equivalents double,
    company_debt double,
    year bigint
) with (
    format = 'ORC',
    partitioning = ARRAY['year']
)

constructed fully qualified table name as: "demo_dv.itr_company_data"
inserting 112 records
  ('AES Corp.', '2NUNNB7D43COUIRE5295', 'US00130H1059', 'Electricity Utilities', 'US', 'North America', 'equity', 'USD', 9420000000.0, 10189000000.0, 8652000000.0, 9681000000.0, 31629120000.0, NULL, NULL, 2020)
  ('RWE AG', '529900GB7KCA94ACC940', 'DE0007037129', 'Electricity Utilities', 'DE', 'Europe', 

In [12]:
company_info_at_base_year = template_company_data.get_company_intensity_and_production_at_base_year([company_id for company_id in template_company_data.df_fundamentals.company_id.values if company_id !='US6745991058-chem'])

import warnings
with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    # See https://github.com/hgrecco/pint-pandas/issues/128
    projected_production = Warehouse.benchmark_projected_production.get_company_projected_production(
        company_info_at_base_year)

In [13]:
production_tablename = f"{itr_prefix}production_data"

osc._do_sql(f"drop table if exists {ingest_catalog}.{ingest_schema}.{production_tablename}", ingest_engine, verbose=True)


production_tabledef = f"""
create table if not exists {ingest_catalog}.{ingest_schema}.{production_tablename}(
    company_name varchar,
    company_lei varchar,
    company_id varchar,
    sector varchar,
    year bigint,
    production_by_year double,
    production_by_year_units varchar) with (
    format = 'ORC',
    partitioning = ARRAY['year']
)
"""

osc._do_sql(production_tabledef, ingest_engine, verbose=True)

# schema = osc.create_table_schema_pairs(production_data)

rename_year_columns={ y: f"y{y}" for y in range(2000,2100)}
df = projected_production.loc[(slice(None), EScope.S1), :].droplevel('scope')
df = df.rename(columns=rename_year_columns).reset_index()
df2 = pd.wide_to_long(df, stubnames='y', i='company_id', j='year')
df2 = df2.rename(columns={'y':'production_by_year'}).reset_index('year')
df3 = df2.merge(template_company_data.df_fundamentals[['company_name', 'company_lei', 'sector']], on='company_id')
production_df = df3
df4 = dequantify_df(df3).reset_index()
df4 = df4[['company_name', 'company_lei', 'company_id', 'sector', 'year', 'production_by_year', 'production_by_year_units']]

df4.to_sql(production_tablename, ingest_engine, schema=ingest_schema, if_exists="append",
           index=False,
           method=osc.TrinoBatchInsert(batch_size = 4000, verbose = True))

drop table if exists osc_datacommons_dev.demo_dv.itr_production_data

create table if not exists osc_datacommons_dev.demo_dv.itr_production_data(
    company_name varchar,
    company_lei varchar,
    company_id varchar,
    sector varchar,
    year bigint,
    production_by_year double,
    production_by_year_units varchar) with (
    format = 'ORC',
    partitioning = ARRAY['year']
)

constructed fully qualified table name as: "demo_dv.itr_production_data"
inserting 3168 records
  ('SK Innovation', '988400PXP70BWVSJVF07', '096770.SK', 'Energy', 2019, 353023255.8139535, 'boe')
  ('SK Innovation', '988400PXP70BWVSJVF07', '096770.SK', 'Energy', 2020, 349718958.1395349, 'boe')
  ('SK Innovation', '988400PXP70BWVSJVF07', '096770.SK', 'Energy', 2021, 346445588.69134885, 'boe')
  ...
  ('Xcel Energy, Inc.', 'LGJNMI9GH8XIDG5RCM61', 'US98389B1008', 'Electricity Utilities', 2050, 163.27378363737597, 'terawatt_hour')
batch insert result: [(3168,)]


In [14]:
target_tablename = f"{itr_prefix}target_data"
trajectory_tablename = f"{itr_prefix}trajectory_data"

osc._do_sql(f"drop table if exists {ingest_catalog}.{ingest_schema}.{target_tablename}", ingest_engine, verbose=True)
osc._do_sql(f"drop table if exists {ingest_catalog}.{ingest_schema}.{trajectory_tablename}", ingest_engine, verbose=True)

# schema = osc.create_table_schema_pairs(target_data)

ei_schema = ',\n'.join([f"ei_{scope.lower()}_by_year double, ei_{scope.lower()}_by_year_units varchar" for scope in EScope.get_scopes()])

target_tabledef = f"""
create table if not exists {ingest_catalog}.{ingest_schema}.{target_tablename}(
    company_name varchar,
    company_lei varchar,
    company_id varchar,
    sector varchar,
    year bigint,
    {ei_schema}) with (
    format = 'ORC',
    partitioning = ARRAY['year']
)
"""
trajectory_tabledef = f"""
create table if not exists {ingest_catalog}.{ingest_schema}.{trajectory_tablename}(
    company_name varchar,
    company_lei varchar,
    company_id varchar,
    sector varchar,
    year bigint,
    {ei_schema}) with (
    format = 'ORC',
    partitioning = ARRAY['year']
)
"""

osc._do_sql(target_tabledef, ingest_engine, verbose=True)
osc._do_sql(trajectory_tabledef, ingest_engine, verbose=True)

drop table if exists osc_datacommons_dev.demo_dv.itr_target_data
drop table if exists osc_datacommons_dev.demo_dv.itr_trajectory_data

create table if not exists osc_datacommons_dev.demo_dv.itr_target_data(
    company_name varchar,
    company_lei varchar,
    company_id varchar,
    sector varchar,
    year bigint,
    ei_s1_by_year double, ei_s1_by_year_units varchar,
ei_s2_by_year double, ei_s2_by_year_units varchar,
ei_s3_by_year double, ei_s3_by_year_units varchar,
ei_s1s2_by_year double, ei_s1s2_by_year_units varchar,
ei_s1s2s3_by_year double, ei_s1s2s3_by_year_units varchar) with (
    format = 'ORC',
    partitioning = ARRAY['year']
)


create table if not exists osc_datacommons_dev.demo_dv.itr_trajectory_data(
    company_name varchar,
    company_lei varchar,
    company_id varchar,
    sector varchar,
    year bigint,
    ei_s1_by_year double, ei_s1_by_year_units varchar,
ei_s2_by_year double, ei_s2_by_year_units varchar,
ei_s3_by_year double, ei_s3_by_year_units varchar,
e

In [15]:
target_dfs = []
trajectory_dfs = []
projection_tablename = [ target_tablename, trajectory_tablename ]

for i, projection in enumerate(['projected_targets', 'projected_intensities']):
    projection_dfs = []
    for company in template_company_data._companies:
        ei_dict = {}
        for scope in EScope.get_scopes():
            if getattr(company, projection)[scope]:
                ei_dict[scope] = getattr(company, projection)[scope].projections
            else:
                ei_dict[scope] = pd.Series(dtype='object')
        ei_data = pd.concat([ei_dict[scope] for scope in EScope.get_scopes()], axis=1).reset_index()
        ei_data.columns = ['year'] + [f"ei_{scope.lower()}_by_year" for scope in EScope.get_scopes()]
        df = pd.DataFrame(data=[[company.company_name, '', company.company_id, company.sector] for i in ei_data.index],
                          columns=['company_name', 'company_lei', 'company_id', 'sector']).convert_dtypes()
        projection_dfs.append(pd.concat([df, ei_data], axis=1))
    df2 = pd.concat(projection_dfs).reset_index(drop=True).convert_dtypes()
    if projection_tablename[i]==target_tablename:
        target_df = df2
    df3 = dequantify_df(df2)
    df3.to_sql(projection_tablename[i], ingest_engine, schema=ingest_schema,
               if_exists="append", index=False,
               method=osc.TrinoBatchInsert(batch_size = 4000, verbose = True))

osc._do_sql(f"select count (*) from {target_tablename}", ingest_engine, verbose=True)
osc._do_sql(f"select count (*) from {trajectory_tablename}", ingest_engine, verbose=True)

constructed fully qualified table name as: "demo_dv.itr_target_data"
inserting 3456 records
  ('AES Corp.', '', 'US00130H1059', 'Electricity Utilities', 2019, 0.4694201529482514, 'CO2e * kt / gigawatt_hour', 0.0033854678781417243, 'CO2e * kt / gigawatt_hour', 0.09886954097301039, 'CO2e * kt / gigawatt_hour', 0.47280562082639316, 'CO2e * kt / gigawatt_hour', 0.5716751617994035, 'CO2e * kt / gigawatt_hour')
  ('AES Corp.', '', 'US00130H1059', 'Electricity Utilities', 2020, 0.45023905056771635, 'CO2e * kt / gigawatt_hour', 0.002661965942231325, 'CO2e * kt / gigawatt_hour', 0.07618253081601555, 'CO2e * kt / gigawatt_hour', 0.4529010165099477, 'CO2e * kt / gigawatt_hour', 0.4873660963812319, 'CO2e * kt / gigawatt_hour')
  ('AES Corp.', '', 'US00130H1059', 'Electricity Utilities', 2021, 0.5868161335415716, 'CO2e * kt / gigawatt_hour', 0.003607598807815507, 'CO2e * kt / gigawatt_hour', 0.10469556468170993, 'CO2e * kt / gigawatt_hour', 0.5904237323493872, 'CO2e * kt / gigawatt_hour', 0.4588295

[(3584,)]

In [16]:
emissions_tablename = f"{itr_prefix}emissions_data"

osc._do_sql(f"drop table if exists {ingest_catalog}.{ingest_schema}.{emissions_tablename}", ingest_engine, verbose=True)

# schema = osc.create_table_schema_pairs(target_data)

co2_schema = ',\n'.join([f"co2_{scope.lower()}_by_year double, co2_{scope.lower()}_by_year_units varchar" for scope in EScope.get_scopes()])

emissions_tabledef = f"""
create table if not exists {ingest_catalog}.{ingest_schema}.{emissions_tablename}(
    company_name varchar,
    company_lei varchar,
    company_id varchar,
    sector varchar,
    year bigint,
    {co2_schema}) with (
    format = 'ORC',
    partitioning = ARRAY['year']
)
"""

# Create emissions_data table using production_df and math
emissions_df = production_df.merge(target_df.drop(columns=['company_name', 'company_lei', 'sector']), on=['company_id', 'year'])
emissions_df = emissions_df[~emissions_df.company_id.isin(['DE000SYM9999', 'NO0010657505', 'GB0000961622', 'DE000BASF111', 'IE00BZ12WP82', 'FR0004024222'])]
print(emissions_df.index.names)
for scope in EScope.get_scopes():
    mask = emissions_df[f"ei_{scope.lower()}_by_year"].isna()
    emissions_df.loc[mask, f"ei_{scope.lower()}_by_year"] = emissions_df['production_by_year'].map(lambda x: Q_(np.nan, f"Mt CO2 / ({str(x.u)})"))
    emissions_df[f"co2_{scope.lower()}_by_year"] = emissions_df['production_by_year'].mul(emissions_df[f"ei_{scope.lower()}_by_year"]).astype("pint[Mt CO2e]")
    emissions_df = emissions_df.drop(columns=f"ei_{scope.lower()}_by_year")
emissions_df = emissions_df.drop(columns='production_by_year')
df = dequantify_df(emissions_df)
df.to_sql(emissions_tablename, ingest_engine, schema=ingest_schema,
          if_exists="append", index=False,
          method=osc.TrinoBatchInsert(batch_size = 4000, verbose = True))

drop table if exists osc_datacommons_dev.demo_dv.itr_emissions_data
[None]
constructed fully qualified table name as: "demo_dv.itr_emissions_data"
inserting 2880 records
  ('096770.SK', 2019, 'SK Innovation', '988400PXP70BWVSJVF07', 'Energy', 10.117595999999999, 'CO2e * megametric_ton', 2.400554, 'CO2e * megametric_ton', 151.79999999999998, 'CO2e * megametric_ton', 12.518149999999999, 'CO2e * megametric_ton', 164.31815, 'CO2e * megametric_ton')
  ('096770.SK', 2020, 'SK Innovation', '988400PXP70BWVSJVF07', 'Energy', 11.099542743700857, 'CO2e * megametric_ton', 2.733049295351212, 'CO2e * megametric_ton', 131.47, 'CO2e * megametric_ton', 12.093238, 'CO2e * megametric_ton', 143.56323799999998, 'CO2e * megametric_ton')
  ('096770.SK', 2021, 'SK Innovation', '988400PXP70BWVSJVF07', 'Energy', 8.963148159254997, 'CO2e * megametric_ton', 2.501464925055745, 'CO2e * megametric_ton', 118.35000000000001, 'CO2e * megametric_ton', 10.9, 'CO2e * megametric_ton', 129.25, 'CO2e * megametric_ton')
  ...

In [17]:
emissions_df

Unnamed: 0,company_id,year,company_name,company_lei,sector,co2_s1_by_year,co2_s2_by_year,co2_s3_by_year,co2_s1s2_by_year,co2_s1s2s3_by_year
0,096770.SK,2019,SK Innovation,988400PXP70BWVSJVF07,Energy,10.117595999999999,2.400554,151.79999999999998,12.518149999999999,164.31815
1,096770.SK,2020,SK Innovation,988400PXP70BWVSJVF07,Energy,11.099542743700857,2.733049295351212,131.47,12.093238,143.56323799999998
2,096770.SK,2021,SK Innovation,988400PXP70BWVSJVF07,Energy,8.963148159254997,2.501464925055745,118.35000000000001,10.9,129.25
3,096770.SK,2022,SK Innovation,988400PXP70BWVSJVF07,Energy,8.144233428818712,2.2729195034693954,117.9650288669896,10.551102884701516,118.55075643308359
4,096770.SK,2023,SK Innovation,988400PXP70BWVSJVF07,Energy,7.395703336263191,2.0640172585653014,117.5813099754093,10.2133735856474,108.67201970219135
...,...,...,...,...,...,...,...,...,...,...
3067,US98389B1008,2046,"Xcel Energy, Inc.",LGJNMI9GH8XIDG5RCM61,Electricity Utilities,2.1158195963480075,,,,
3068,US98389B1008,2047,"Xcel Energy, Inc.",LGJNMI9GH8XIDG5RCM61,Electricity Utilities,1.512704254983762,,,,
3069,US98389B1008,2048,"Xcel Energy, Inc.",LGJNMI9GH8XIDG5RCM61,Electricity Utilities,0.9629449574412781,,,,
3070,US98389B1008,2049,"Xcel Energy, Inc.",LGJNMI9GH8XIDG5RCM61,Electricity Utilities,0.4604972513316361,,,,
