# ITR Data Pipeline

The ITR data pipeline organizes and assembles data needed for the ITR tool.  The data may come from many sources, but the output of this pipeline is a complete, consistent dataset that can be fully interrogated by the ITR tool.  If users wish to add additional data or analyze additional portfolio companies, they must create a new dataset using this pipeline.

These are the data needed to create the ITR dataset:
* Global Parameters (just for reference--we do nothing with them here)
* Industry Data (Sector Projections aka Benchmarks)
* Portfolio Data (Must cover all the stocks a user may query)
* Company Data (Must cover all companies in all possible portfolio universes)
* Automization (Must cover all years and scenarios a user may query)

The ITR tool can create secondary datasets:
* Cumulative emissions targets trajectories
* Cumulative emissions budgets
* Target and trajectory overshoot/undershoot ratios
* Target and trajectory temperature scores

These secondary datasets are not the concern of this pipeline.

### Environment variables and dot-env

The following cell looks for a "dot-env" file in some standard locations,
and loads its contents into `os.environ`.

In [None]:
from dotenv import dotenv_values, load_dotenv
import os
import pathlib
import numpy as np
import pandas as pd
import trino
from sqlalchemy.engine import create_engine
import osc_ingest_trino as osc

# import python_pachyderm

Define Environment and Execution Variables

In [None]:
# Load environment variables from credentials.env
osc.load_credentials_dotenv()

In [None]:
import osc_ingest_trino as osc

# from ITR.data.osc_units import *
import io

In [None]:
# See data-platform-demo/pint-demo.ipynb for quantify/dequantify functions

import warnings  # needed until quantile behaves better with Pint quantities in arrays
from pint import set_application_registry, Quantity
from pint_pandas import PintArray, PintType
from openscm_units import unit_registry

# openscm_units doesn't make it easy to set preprocessors.  This is one way to do it.
unit_registry.preprocessors = [
    lambda s1: s1.replace("passenger km", "passenger_km"),
    lambda s2: s2.replace("BoE", "boe"),
]

PintType.ureg = unit_registry
ureg = unit_registry
set_application_registry(ureg)
Q_ = ureg.Quantity
PA_ = PintArray

ureg.define("CO2e = CO2 = CO2eq = CO2_eq")
ureg.define("Fe = [iron] = Steel")
ureg.define("iron = Fe")
ureg.define("Al = [aluminum] = Aluminum")
ureg.define("aluminum = Al")
ureg.define("Cement = [cement]")
ureg.define("cement = Cement")

# These are for later
ureg.define("fraction = [] = frac")
ureg.define("percent = 1e-2 frac = pct = percentage")
ureg.define("ppm = 1e-6 fraction")

ureg.define("USD = [currency]")
ureg.define("EUR = nan USD")
ureg.define("JPY = nan USD")

ureg.define("btu = Btu")
ureg.define("boe = 5.712 GJ")

# Transportation activity

ureg.define("vehicle = [vehicle] = v")
ureg.define("passenger = [passenger] = p = pass")
ureg.define("vkm = vehicle * kilometer")
ureg.define("pkm = passenger * kilometer")
ureg.define("tkm = tonne * kilometer")

ureg.define("hundred = 1e2")
ureg.define("thousand = 1e3")
ureg.define("million = 1e6")
ureg.define("billion = 1e9")
ureg.define("trillion = 1e12")
ureg.define("quadrillion = 1e15")

### S3 and boto3

In [None]:
import boto3

s3_source = boto3.resource(
    service_name="s3",
    endpoint_url=os.environ["S3_LANDING_ENDPOINT"],
    aws_access_key_id=os.environ["S3_LANDING_ACCESS_KEY"],
    aws_secret_access_key=os.environ["S3_LANDING_SECRET_KEY"],
)
source_bucket = s3_source.Bucket(os.environ["S3_LANDING_BUCKET"])

### Connecting to Trino with sqlalchemy

In the context of the Data Vault, this pipeline operates with full visibiilty into all the data it prepares for the ITR tool.  When the data is output, it is labeled so that the Data Vault can enforce its data management access rules.

In [None]:
ingest_catalog = "osc_datacommons_dev"
ingest_schema = "sandbox"
dera_schema = "sandbox"
dera_prefix = "dera_"
gleif_schema = "sandbox"
rmi_schema = "sandbox"
iso3166_schema = "sandbox"
essd_schema = "sandbox"
essd_prefix = "essd_"
demo_schema = "demo_dv"

engine = osc.attach_trino_engine(verbose=True, catalog=ingest_catalog)

## Global Parameters

These parameters are set/selected by the ITR tool.  They are included here for reference only (the following is not live code).

Create the ISIC-to-Sector table manually until we have a proper sector mapping table

In [None]:
i2s_df = pd.DataFrame({"isic": [2410, 4010], "sector": ["Steel", "Electricity Utilities"]}).convert_dtypes()

ingest_table = "isic_to_sector"
drop_table = engine.execute(f"drop table if exists {demo_schema}.{ingest_table}")
drop_table.fetchall()

columnschema = osc.create_table_schema_pairs(i2s_df)

tabledef = f"""
create table if not exists {ingest_catalog}.{demo_schema}.{ingest_table}(
{columnschema}
) with (
    format = 'ORC',
    partitioning = array['bucket(isic,20)']
)
"""

print(tabledef)
qres = engine.execute(tabledef)
print(qres.fetchall())
i2s_df.to_sql(
    ingest_table,
    con=engine,
    schema=demo_schema,
    if_exists="append",
    index=False,
    method=osc.TrinoBatchInsert(batch_size=2000, verbose=True),
)

## Portfolio Data

The user will ultimately supply portfolio selection and position information to the ITR tool as part of the weighting calculations.  This part of the pipeline just collects the LEI and ISIN information for companies we should expect to analyze (i.e., companies for which we have fundamental financial information, production, intensity, and target information, in sectors for which we have benchmark projections).

Because this pipeline does the full pre-computation of data for the tool, there is no sense carrying forward information that is not fully closed.  I.e., there's no reason to carry forward an LEI:ISIN relationship if there is no financial, production, or target information related to that LEI and/or ISIN.  The user does not add such data later; the data is collected and fully processed by this pipeline now.

### Get LEI/ISIN data

RMI handes us data already matched with LEIs and ISINs.  Other lists of company names may require us to stitch that together manually.

In [None]:
# TODO: sort why some notorious utilities are missing LEIs in the following query--bad source data?
rmi_lei_isin = pd.read_sql(
    f"select DISTINCT parent_name, parent_lei, parent_isin from {rmi_schema}.utility_information where parent_name IS NOT NULL",
    engine,
)
# Fabricate LEIs for entities that have none
print(sorted(list(rmi_lei_isin.loc[rmi_lei_isin.parent_lei.isnull()].parent_name)))
rmi_lei_isin.loc[rmi_lei_isin.parent_lei.isnull(), "parent_lei"] = rmi_lei_isin.apply(
    lambda x: f"RMI{x.name:017}", axis=1
)
rmi_lei_isin.loc[rmi_lei_isin.parent_isin.isnull(), "parent_isin"] = rmi_lei_isin.apply(
    lambda x: f"ZZ{x.name:011}", axis=1
)
# Install LEIs whose hierarchy levels don't match what we matched for SEC DERA data
rmi_lei_isin.loc[rmi_lei_isin.parent_name == "PG&E Corp.", "parent_lei"] = "8YQ2GSDWYZXO2EDN3511"
rmi_lei_isin.loc[rmi_lei_isin.parent_name == "Verso Corp.", "parent_lei"] = "549300FODXCTQ8DGT594"
rmi_lei_isin.loc[rmi_lei_isin.parent_name == "Verso Corp.", "parent_isin"] = "US92531L2079"
rmi_lei_dict = dict(zip(rmi_lei_isin.parent_lei, rmi_lei_isin.parent_isin))

Implement an *ad hoc* ingestion pipeline for Steel portfolio.  Later we will ingest steel production data.  We use this only to define the universe, not for actual investment information.

In [None]:
steel_idx = pd.read_csv(
    os.environ.get("PWD") + f"/itr-data-pipeline/data/external/mdt-steel-portfolio.csv",
    header=0,
    sep=";",
    usecols=["company_name", "company_lei", "company_id"],
    dtype=str,
    engine="c",
)
# display(steel_idx)

Prepare GLEIF matching data for SEC DERA data.  In the future, such matching will use the ESG Entity-Matching pipeline (https://github.com/os-climate/financial-entity-cleaner/tree/version_0.1.0).

In [None]:
gleif_file = s3_source.Object(os.environ["S3_LANDING_BUCKET"], "mtiemann-GLEIF/DERA-matches.csv")
gleif_file.download_file(f"/tmp/dera-gleif.csv")
gleif_df = pd.read_csv(f"/tmp/dera-gleif.csv", header=0, sep=",", dtype=str, engine="c")
gleif_dict = dict(zip(gleif_df.name, gleif_df.LEI))
del gleif_df

# Many of the following ISINs are bonds, but some are also stocks (on various exchanges)
# But we don't need to load and match here, because the portfolio has the ISINs
if False:
    gleif_isin_file = s3_source.Object(os.environ["S3_LANDING_BUCKET"], "mtiemann-GLEIF/ISIN_LEI_20211009.csv")
    gleif_isin_file.download_file(f"/tmp/ISIN_LEI_20211009.csv")
    gleif_isins = pd.read_csv(f"/tmp/ISIN_LEI_20211009.csv", header=0, sep=",", dtype=str, engine="c")

Create a very simple entity matcher, cleaning up slight variations in company names between RMI's entity names, the SEC's entity names, and GLEIF's entity names.

Commented out are names we would have to fix if there were SEC data for them.  But because not, we'll never match what's not there in the first place.

In [None]:
# gleif_dict['Basin Electric Power Coop'.upper()] = gleif_dict['BASIN ELECTRIC POWER COOPERATIVE']
# gleif_dict['Big Rivers Electric Corp'.upper()] = gleif_dict['BIG RIVERS ELECTRIC CORPORATION']
# gleif_dict['CHUGACH ELECTRIC ASSOCIATION INC'] = gleif_dict['CHUGACH ELECTRIC ASSN INC.']
gleif_dict["Cleco Partners LP".upper()] = gleif_dict["CLECO CORPORATE HOLDINGS LLC"]
# gleif_dict['Golden Spread Electric Coop., Inc'.upper()] = gleif_dict['GOLDEN SPREAD ELECTRIC COOPERATIVE, INC.']
gleif_dict["MIDWEST ENERGY INC"] = "549300O4B5CVWMKUES27"
gleif_dict["OG&E Energy".upper()] = gleif_dict["OGE ENERGY CORP."]
# gleif_dict['Ohio Valley Electric Corp'.upper()] = gleif_dict['OHIO VALLEY ELECTRIC CORPORATION']
gleif_dict["Old Dominion Electric Coop".upper()] = gleif_dict["OLD DOMINION ELECTRIC COOPERATIVE"]
gleif_dict["PG&E Corp.".upper()] = gleif_dict["PG&E CORP"]
gleif_dict["Tri-State Generation & Transmission Association".upper()] = gleif_dict[
    "TRI-STATE GENERATION & TRANSMISSION ASSOCIATION, INC."
]
gleif_dict["DOMINION ENERGY INC"] = "ILUL7B6Z54MRYCF6H308"

gleif_dict["GROUP SIMEC SA DE CV"] = "529900LCYCXPA0TZEU09"

gleif_1 = {k.split(",")[0].split(" ")[0]: v for k, v in gleif_dict.items()}
gleif_2 = {" ".join(k.split(",")[0].split(" ")[0:2]): v for k, v in gleif_dict.items()}


def gleif_match(x):
    x = x.split(",")[0]
    if x in gleif_dict:
        return gleif_dict[x]
    x = x.replace(".", "")
    if x in gleif_dict:
        return gleif_dict[x]
    x2 = " ".join(x.split(" ")[0:2])
    if x2 in gleif_2:
        return gleif_2[x2]
    if " " not in x and x in gleif_1:
        return gleif_1[x]
    return None

Collect the universe of company names for the sectors we cover.  Steel sector is SIC 3310-3317. Electricity Utilities is SIC 4911 (but also 4931-4932 and 4991).

Some conglomerates have more general SIC codes that hide their activities in sectors of interest.  Others report those SIC codes within reportable segements.
Without more detailed SEC DERA data (available in an S3 bucket but not yet processed as a pipeline), we will not collect the company names we need to collect.

In [None]:
sec_lei_isin = pd.read_sql(
    f"""
select DISTINCT F.name, F.lei, F.sic
from {dera_schema}.financials_by_lei F
where (sic=4911 or sic=4931 or sic=4932 or sic=4991)
      or (sic>=3310 and sic<=3317)
""",
    engine,
)
sec_lei_isin.loc[sec_lei_isin.name == "DOMINION ENERGY INC", "lei"] = "ILUL7B6Z54MRYCF6H308"
sec_lei_isin.loc[sec_lei_isin.name == "GROUP SIMEC SA DE CV", "lei"] = "529900LCYCXPA0TZEU09"
sec_lei_isin.loc[sec_lei_isin.name == "ENEL GENERACION CHILE S.A.", "lei"] = "549300PVHXUFEIE6LY50"
sec_lei_isin.loc[sec_lei_isin.name == "POSCO HOLDINGS INC.", "lei"] = "988400E5HRVX81AYLM04"
sec_lei_isin.loc[sec_lei_isin.name == "ARCHAEA ENERGY INC.", "lei"] = "549300ZBE567NNMH7V89"
sec_lei_isin.loc[sec_lei_isin.name == "CLEANSPARK, INC.", "lei"] = "254900VO7KBRJQDGY810"
sec_lei_isin.loc[sec_lei_isin.name == "ALGOMA STEEL GROUP INC.", "lei"] = "549300Q5EU337A1XCX27"
sec_lei_isin.loc[sec_lei_isin.name == "ECO WAVE POWER GLOBAL AB (PUBL)", "lei"] = "5493003GP1XAFTYRJM76"

# CPFL ENERGIA S.A.
sec_lei_isin.loc[sec_lei_isin.name == "CPFL ENERGY INC", "lei"] = "529900GBWSBDXN8GGM28"

# GRUPOSIMEC, S.A.B. de C.V.
sec_lei_isin.loc[sec_lei_isin.name == "GRUPO SIMEC, S.A.B. DE C.V.", "lei"] = "529900LCYCXPA0TZEU09"

# PAMPA ENERGIA S A 254900QNIK0CVURGML24
sec_lei_isin.loc[sec_lei_isin.name == "PAMPA ENERGY INC.", "lei"] = "254900QNIK0CVURGML24"

# COMPANHIA ENERGETICA DE MINAS GERAIS CEMIG
sec_lei_isin.loc[sec_lei_isin.name == "ENERGY CO OF MINAS GERAIS", "lei"] = "254900W703PXLDSEM056"

# Centrais Elétricas Brasileiras S/A
sec_lei_isin.loc[sec_lei_isin.name == "BRAZILIAN ELECTRIC POWER CO", "lei"] = "254900I8KYDELP4B4Z08"

missing_leis = sec_lei_isin[sec_lei_isin.lei.isna()]
sec_lei_isin.dropna(inplace=True)
print("The following companies are missing LEI information and will be dropped:")
display(missing_leis)

We create a theoretical portfolio that conveniently contains all available LEI and ISIN information, meaning we don't need to do entity matching or ISIN matching.

Other portfolios may need a lot more work before they can be used to precompute other data.  The code above are samples of the kind of extra data/processing needed for such portfolios.

In [None]:
rmi_idx = rmi_lei_isin.rename(
    columns={
        "parent_name": "company_name",
        "parent_lei": "company_lei",
        "parent_isin": "company_id",
    }
)
# rmi_idx.insert(1, 'company_lei', portfolio_df.company_name.str.upper().map(gleif_match))
# if rmi_idx.company_lei.isna().any():
#     display(rmi_idx[rmi_idx.company_lei.isna()])
rmi_idx.loc[rmi_idx.company_id.isna(), "company_id"] = rmi_idx.apply(lambda x: f"ZZ{x.name:010}", axis=1)

print(f"Number of RMI portfolio copmanies = {len(rmi_idx)}")

Show list of RMI companies that use made-up LEIs or ISINs

In [None]:
rmi_idx[rmi_idx.company_lei.str.startswith("RMI")]

Add Steel company portfolio

In [None]:
portfolio_idx = pd.concat([rmi_idx, steel_idx])
portfolio_idx = portfolio_idx.convert_dtypes()

print(f"Number of total portfolio companies = {len(portfolio_idx)}")

### Company Data

The SIC-to-ISIC table is an open workstream item: https://github.com/os-climate/itr-data-pipeline/issues/1

### Capture a list of the companies for which we have good financial info

We limit our view to the companies in our portfolio.  The user can prioritize whether this is the best source of revenue, market cap, etc., or whether they prefer another source.

Note for future reference: Berkshire Hathaway has one line of business for Energy and another for Steel.  We don't yet have line-of-business info because we use summary data from SEC DERA, not the detailed Notes version of the dataset.

In [None]:
ingest_table = "portfolio_universe"

drop_table = engine.execute(f"drop table if exists {ingest_schema}.{ingest_table}")
drop_table.fetchall()

columnschema = osc.create_table_schema_pairs(portfolio_idx, typemap={"datetime64[ns]": "timestamp(6)"})

tabledef = f"""
create table if not exists {ingest_catalog}.{ingest_schema}.{ingest_table}(
{columnschema}
) with (
    format = 'ORC',
    partitioning = array['bucket(company_lei, 20)']
)
"""
print(tabledef)
create_table = engine.execute(tabledef)
print(create_table.fetchall())
portfolio_idx.to_sql(
    ingest_table,
    con=engine,
    schema=ingest_schema,
    if_exists="append",
    index=False,
    method=osc.TrinoBatchInsert(batch_size=5000, verbose=True),
)

### Create a list with metric labels embedded in the output for easy reading...

Highlight any rows that have NULL data

### Capture and print a list of companies with financial info

Financial information is part of the "fundamental data" we need for the ITR portfolio companies.  The other part is base year production, emission, and intensity data.  We query the two separately because we have a unified source of truth for the former (SEC DERA) but multiple sources for the latter (RMI for Electric Utilities and MDT for Steel).

### Financial info:
* Company Name, LEI, ISIN, year
* ISIC Code (for Sector)
* Country and Region
* Revenue, Market Cap, Enterprise Value, Assets, Cash

We currently focus exclusively on data from 2019 as our base year

In [None]:
base_financial_sql = f"""
select DISTINCT P.company_name, P.company_lei, P.company_id,
       F.country, UN.region_ar6_10 as region,
       if(S2I.isic in (2410, 3310, 3312) or P.company_name='CLEVELAND-CLIFFS INC', 'Steel', 'Electricity Utilities') as sector,
       'equity' as exposure, 'USD' as currency,
       year(F.ddate) as year,
       F.market_cap_usd as company_market_cap,
       F.revenue_usd as company_revenue,
       F.market_cap_usd+F.debt_usd-F.cash_usd as company_ev,
       F.market_cap_usd+F.debt_usd as company_evic,
       F.assets_usd as company_total_assets,
       F.cash_usd as company_cash_equivalents,
       F.debt_usd as company_debt
from {ingest_schema}.portfolio_universe as P
     left join {dera_schema}.financials_by_lei as F on F.lei=P.company_lei and year(F.ddate)=2019
     join {iso3166_schema}.countries as I on F.country=I.alpha_2
     join {essd_schema}.{essd_prefix}regions as UN on I.alpha_3=UN.iso
     -- join {dera_schema}.{dera_prefix}sub as S on S.cik=F.cik
     -- left join {rmi_schema}.utility_information as U on U.parent_lei=P.company_lei
     -- left join {gleif_schema}.gleif_isin_lei G on G.lei=P.lei and G.isin=U.parent_isin
     left join {dera_schema}.sic_isic as S2I on S2I.sic=F.sic
     -- left join {rmi_schema}.operations_emissions_by_fuel as E on U.respondent_id=E.respondent_id and year(E.year)=year(F.ddate)
-- where E.owned_or_total='owned'
group by P.company_name, P.company_lei, P.company_id,
       F.country, UN.region_ar6_10,
       if(S2I.isic=2410 or P.company_name='CLEVELAND-CLIFFS INC', 'Steel', 'Electricity Utilities'),
       6, 7, -- exposure, currency
       year(F.ddate),
       F.market_cap_usd, F.revenue_usd, F.market_cap_usd+F.debt_usd-F.cash_usd, F.market_cap_usd+F.debt_usd, F.assets_usd, F.cash_usd, F.debt_usd
order by P.company_name
limit 200
"""

### Emissions/Production info
* Company Name, LEI, ISIN (join axis with financial info)
* Sector (inferred from RMI data as a source rather than ISIC)
* Production (in whatever units -- we need units in either metadata or a column or as part of the data element iselft)
* S1, S2, S3 emissions (in megametric tons CO2e)
* S1, S2, S3 emissions intensity (emissions / production, in whatever units this resolves to)

We currently focus exclusively on data from 2019 as our base year

Note that RMI data is S1 only (own generation); we use zero as S2 value

In [None]:
# 'sector', 's1_co2', 's2_co2', 's3_co2', 's1_ei', 's2_ei', 's3_ei', 'production'
scopes = ["s1", "s2", "s3"]

emissions_sql = f"""
select DISTINCT P.company_name, P.company_lei, P.company_id,
       'Electricity Utilities' as sector, year(E.year) as year,
       sum(E.emissions_co2 + (265/1000000.0)*coalesce(E.emissions_nox, 0)) as ghg_s1, 0 as ghg_s2, NULL as ghg_s3,
       sum(E.emissions_co2 + (265/1000000.0)*coalesce(E.emissions_nox, 0)) / sum(E.generation) as ei_s1, 0 as ei_s2, NULL as ei_s3,
       sum(E.generation) as production
from {ingest_schema}.portfolio_universe as P
     join {rmi_schema}.utility_information as U on U.parent_lei=P.company_lei
     join {rmi_schema}.operations_emissions_by_fuel as E on U.respondent_id=E.respondent_id
where year(E.year)>=2014 and year(E.year)<2023
   and P.company_lei!='529900L26LIS2V8PWM23' -- American States Water has negative/zero production values that mess things up
-- and E.owned_or_total='owned'
group by P.company_name, P.company_lei, P.company_id, 3, year(E.year)
order by P.company_name
"""

### `financial_df` contains all the base year (2019) financial, production, and emissions data

For now our benchmark data covers only North America and Europe.  Over time, we expect additional regions (possibly on a per-sector basis).

In [None]:
financial_df = pd.read_sql(
    base_financial_sql,
    engine,
    index_col=["company_name", "company_lei", "company_id", "sector"],
).convert_dtypes()
financial_df.region = financial_df.region.apply(
    lambda x: x if x in ["Asia", "Europe", "North America"] else "Global"
).astype("string")
financial_df

### `emissions_df` contains all the base year (2019) production and emissions data

In [None]:
rmi_emissions_df = pd.read_sql(
    emissions_sql,
    engine,
    index_col=["company_name", "company_lei", "company_id", "sector"],
).convert_dtypes()
for scope in scopes:
    rmi_emissions_df["ghg_" + scope] = rmi_emissions_df["ghg_" + scope].astype("pint[Mt CO2]")
    rmi_emissions_df["ei_" + scope] = rmi_emissions_df["ei_" + scope].astype("pint[Mt CO2/TWh]")
rmi_emissions_df["production"] = rmi_emissions_df["production"].astype("pint[TWh]")
rmi_emissions_df["ghg_s1s2"] = rmi_emissions_df["ghg_s1"] + rmi_emissions_df["ghg_s2"]
rmi_emissions_df["ei_s1s2"] = rmi_emissions_df["ghg_s1s2"] / rmi_emissions_df["production"]
template_rmi_df = rmi_emissions_df.pivot(index=None, columns="year")

# Put column names into YYYY_metric order (Multi-index has this order inverted)
template_rmi_df.columns = template_rmi_df.columns.map(lambda x: f"{x[1]}_{x[0]}")
template_rmi_df = template_rmi_df.loc[:, ~template_rmi_df.columns.str.contains("_ei_")]
display(template_rmi_df)

### Collect emissions/production info from the MDT Steel data
* Company Name, LEI, ISIN (join axis with financial info)
* Sector (inferred as Steel from source)
* Production (in whatever units -- we need units in either metadata or a column or as part of the data element itself)
* S1, S2, S3 emissions (in whatever units of CO2e)
* S1, S2, S3 emissions intensity (emissions / production, in whatever units this resolves to)

If a company has no emissions or production information, we don't carry it forward as data (even if it does have revenue, earnings, etc.)

In [None]:
steel_wb = pd.read_excel(
    os.environ.get("PWD") + f"/itr-data-pipeline/data/external/mdt-steel-demo.xlsx",
    sheet_name=None,
)
steel_production = steel_wb["Steel Fe_tons"].dropna(axis=1, how="all")
steel_production.set_index(steel_production.columns[0:3].to_list(), inplace=True)
steel_production = steel_production.dropna(axis=0, how="all")
steel_production = steel_production.astype("pint[t Steel]")
steel_co2 = {}
steel_ei = {}
for scope in scopes:
    steel_co2[scope] = steel_wb[f"Steel CO2e {scope.upper()}"].dropna(axis=1, how="all")
    steel_co2[scope].set_index(steel_co2[scope].columns[0:3].to_list(), inplace=True)
    steel_co2[scope] = steel_co2[scope].dropna(axis=0, how="all")
    steel_co2[scope] = steel_co2[scope].astype("pint[t CO2]")
    steel_ei[scope] = (steel_co2[scope] / steel_production).dropna(how="all")

In [None]:
def rename_column_emissions(df, scope):
    df = df.loc[:, 2014:2020]
    df.columns = df.columns.map(lambda x: f"{x}_ghg_{scope}")
    return df


template_steel_co2 = pd.concat([rename_column_emissions(steel_co2[scope], scope) for scope in scopes], axis=1)
for year in range(2014, 2021):
    template_steel_co2.insert(
        len(template_steel_co2.columns) - 5,
        f"{year}_ghg_s1s2",
        steel_co2["s1"][year] + steel_co2["s2"][year],
    )
template_steel_co2

In [None]:
template_steel_production = steel_production.loc[:, 2014:2020]
template_steel_production.columns = template_steel_production.columns.map(lambda x: f"{x}_production")
template_steel_production

In [None]:
template_steel_df = pd.concat([template_steel_co2, template_steel_production], axis=1)
template_steel_df.insert(0, "sector", "Steel")
template_steel_df.set_index(["sector"], append=True, inplace=True)
template_steel_df.insert(0, "emissions_metric", "t CO2")
template_steel_df.insert(1, "production_metric", "t Steel")
template_steel_df

In [None]:
pd.options.display.max_rows = 99
pd.options.display.max_columns = 49
template_df = (
    pd.concat([financial_df, pd.concat([template_steel_df, template_rmi_df])], axis=1)
    .dropna(thresh=16)
    .drop(columns=["company_cash_equivalents", "company_debt"], axis=1)
)
template_df.loc[
    pd.IndexSlice[:, :, :, ["Electricity Utilities"]],
    ["emissions_metric", "production_metric"],
] = ["Mt CO2", "TWh"]
template_df = template_df.reset_index()
cols = template_df.columns.tolist()
cols = cols[:3] + cols[4:6] + [cols[3]] + cols[6:]
template_df = template_df[cols]
for col in cols:
    if col.startswith("2020_"):
        col_index = template_df.columns.get_loc(col)
        for year in [2022, 2021]:
            newcol = col.replace("2020", str(year))
            template_df.insert(col_index + 1, newcol, np.nan)
            if "_ghg_" in newcol:
                template_df.loc[:, newcol] = template_df["emissions_metric"].map(lambda x: Q_(np.nan, x))
            else:
                template_df.loc[:, newcol] = template_df["production_metric"].map(lambda x: Q_(np.nan, x))
display(template_df.sample(15))
pd.reset_option("display.max_rows")
pd.reset_option("display.max_columns")

In [None]:
with pd.ExcelWriter("../data/processed/template-20220415-output.xlsx", datetime_format="YYYY") as writer:
    template_df.to_excel(writer, sheet_name="ITR input data", index=False)

In [None]:
stop!

### Load emissions target data

The RMI power plant data is valid for Scope 1 emissions only.

In [None]:
engine.execute(f"describe {rmi_schema}.emissions_targets").fetchall()

### `targets_df` has all the historical and target emissions data
### `trajectory_df` is derived from historical target emissions data

We also preserve RMI's 1.5 degree target info, which can be presented as a trajectory to compare/contrast corporate targets with RMI's best policy recommendations
* rtg_df is the RMI contribution to targets_df (RMI data frame)
* mtg_df is the Steel contribution to targets_df (MDT data frame)

We do not consider targets/emissions for WIRES ONLY utilities (who have no generation of their own).

We set the LEI information based on our hand-curated GLEIF table, not the LEI info in the RMI and SEC data tables

In [None]:
# Emissions targets are now segregated by states, but we care more about rolling them up to the company level.
# Therefore we sum absolutes (emissions and generation) and re-compute intensities based on the aggregated amounts.

rtg_df = pd.read_sql(
    f"""
select ET.parent_name as company_name, ET.respondent_id, 'Electricity Utilities' as sector, year(ET.year) as year,
       sum(co2_target) as co2_s1_target,
       sum(co2_historical) as co2_s1_historical,
       sum(co2_target_all_years) as co2_s1_target_all_years,
       sum(co2_1point5C) as co2_s1_1point5C,
       sum(generation_historical) as production_historical,
       sum(generation_projected) as production_projected,
       sum(generation_1point5C) as production_1point5C
from {rmi_schema}.emissions_targets ET
     join (select respondent_id, year
           from {rmi_schema}.operations_emissions_by_tech
           where technology_eia!='Batteries' and technology_eia!='Hydroelectric Pumped Storage'
           group by respondent_id, year) EM
           on ET.respondent_id=EM.respondent_id and ((year(ET.year)>2020 and year(EM.year)=2020) or (ET.year=EM.year) or ((year(ET.year)<2005 and year(EM.year)=2005) ))
     -- join (select parent_name, parent_lei from {rmi_schema}.utility_information group by parent_name, parent_lei) U
     --       on ET.parent_name=U.parent_name
     -- join {dera_schema}.financials_by_lei as F on F.lei=U.parent_lei
where ET.target_type='All'
group by ET.parent_name, ET.respondent_id, year(ET.year)
order by company_name, year
""",
    engine,
)  # parse_dates=['year']

# We set the LEI information based on our hand-curated GLEIF table, not the LEI info in the RMI and SEC data tables
rtg_df.insert(1, "company_lei", rtg_df.company_name.str.upper().map(gleif_match))
rtg_df.insert(2, "company_id", rtg_df.company_lei.map(rmi_lei_dict))
rtg_df.loc[rtg_df.production_historical > 0, "ei_s1_historical"] = (
    rtg_df.co2_s1_historical / rtg_df.production_historical
)
rtg_df["production_general"] = rtg_df[["production_historical", "production_projected"]].bfill(axis=1).iloc[:, 0]
rtg_df.loc[rtg_df.production_general > 0, "ei_s1_target"] = rtg_df.co2_s1_target / rtg_df.production_general
rtg_df.loc[rtg_df.production_general > 0, "ei_s1_target_all_years"] = (
    rtg_df.co2_s1_target_all_years / rtg_df.production_general
)
rtg_df.loc[rtg_df.production_1point5C > 0, "ei_s1_1point5C"] = rtg_df.co2_s1_1point5C / rtg_df.production_1point5C
rtg_df.drop(columns="production_general", inplace=True)
rtg_df = rtg_df[
    rtg_df.company_lei != "529900L26LIS2V8PWM23"
]  # American States Water has negative/zero production values that mess things up

In [None]:
for col in rtg_df.columns:
    if col.startswith("co2_"):
        rtg_df[col] = rtg_df[col].astype("pint[Mt CO2]")
    elif col.startswith("production_"):
        rtg_df[col] = rtg_df[col].astype("pint[TWh]")
    elif col.startswith("ei_"):
        rtg_df[col] = rtg_df[col].astype("pint[Mt CO2/TWh]")
rtg_df = rtg_df.convert_dtypes()
print(rtg_df.dtypes)
print(f"len(rtg_df) = {len(rtg_df)}")

### Fix target information comprehensively (mostly fixed with March 2022 update)

1. Where co2_target is set to zero before 2019 and then ramps up to a non-zero number before 2020, clear the target number and replace all target data with historical data
2. Where co2_target is NULL, generation_historical==1, and co2_intensity_historical==0, remove false generation_historical==1 data point.  There is never any generation before generators are operational.
3. Where co2_historical is non-NULL and non-zero, look for outlier data.  If the generation_historical for the outlier data is not an outlier in the generation data, recompute co2_intensity_historical and co2_historical based on non-outlier data
4. Where max(year) < 2020, discard forward-looking projections: they are represented elsewhere
5. Where production_projected is non-NULL and flatline from 2021-2050, replace with OECM production growth values for 'North America' region

In [None]:
print("Step 4: When data is exhausted prior to 2020, discard forward-looking projections represented elsewhere")

step4_df = rtg_df.loc[rtg_df.year == 2019, ["respondent_id", "production_historical"]].fillna(0)
step4_index = step4_df[step4_df.production_historical != 0]["respondent_id"]
print(f"Initial length of target dataset: {len(rtg_df)}")
print("respondent_id not in index")
print(sorted(rtg_df.loc[~rtg_df.respondent_id.isin(step4_index), "respondent_id"].drop_duplicates().tolist()))
rtg_df = rtg_df.loc[rtg_df.respondent_id.isin(step4_index)]
print(f"Resulting length of target dataset: {len(rtg_df)}")

The RMI targets only cover S1, so we don't need to compute the non-existent S2 and S3 numbers (until they do provide such).

In [None]:
def compute_sums_and_wavg(x):
    zero_Mt_CO2 = Q_(0.0, "Mt CO2")
    d = {
        "co2_s1_by_year": x["co2_s1_target_all_years"].sum(),
        "co2_s2_by_year": zero_Mt_CO2,
        "production_by_year": x[["production_historical", "production_projected"]].bfill(axis=1).iloc[:, 0].sum(),
    }
    return pd.Series(d, index=["co2_s1_by_year", "co2_s2_by_year", "production_by_year"])


with warnings.catch_warnings():
    warnings.simplefilter("ignore")

    rmi_targets_df = (
        rtg_df[rtg_df.year >= 2014]
        .groupby(["company_name", "company_lei", "company_id", "sector", "year"])  # grouping automagically sets index
        .apply(compute_sums_and_wavg)
        .sort_values(["company_name", "year"], ascending=[True, True])
    )
m = rmi_targets_df.production_by_year != 0

In [None]:
rmi_targets_df.loc[~m, "ei_s1_by_year"] = m.map(lambda x: Q_(np.nan, "Mt CO2/TWh"))
rmi_targets_df.loc[~m, "ei_s2_by_year"] = m.map(lambda x: Q_(np.nan, "Mt CO2/TWh"))
rmi_targets_df.loc[~m, "ei_s1s2_by_year"] = m.map(lambda x: Q_(np.nan, "Mt CO2/TWh"))
rmi_targets_df.loc[m, "ei_s1_by_year"] = rmi_targets_df.co2_s1_by_year / rmi_targets_df.production_by_year
rmi_targets_df.loc[m, "ei_s2_by_year"] = rmi_targets_df.co2_s2_by_year / rmi_targets_df.production_by_year
rmi_targets_df.loc[m, "ei_s1s2_by_year"] = (
    rmi_targets_df.co2_s1_by_year + rmi_targets_df.co2_s2_by_year
) / rmi_targets_df.production_by_year

In [None]:
rmi_targets_df.loc["Exelon Corp.", :, :, :]

In [None]:
steel_production.iloc[0:2]

In [None]:
mdt_production = (
    steel_production.melt(var_name="year", value_name="production_by_year", ignore_index=False)
    .dropna()
    .set_index(["year"], append=True)
)
# display(mdt_production)
mdt_co2 = pd.concat(
    [
        steel_co2[scope]
        .melt(var_name="year", value_name=f"co2_{scope}_by_year", ignore_index=False)
        .dropna()
        .set_index(["year"], append=True)
        for scope in scopes
    ],
    join="outer",
    axis=1,
)
# display(mdt_co2)
mdt_ei = pd.concat(
    [
        steel_ei[scope]
        .melt(var_name="year", value_name=f"ei_{scope}_by_year", ignore_index=False)
        .dropna()
        .set_index(["year"], append=True)
        for scope in scopes
    ],
    join="outer",
    axis=1,
)
# display(mdt_ei)

In [None]:
steel_targets_df = pd.concat([mdt_production, mdt_co2, mdt_ei], join="outer", axis=1)
steel_targets_df.insert(2, "sector", "Steel")
steel_targets_df.set_index(["sector"], append=True, inplace=True)
steel_targets_df = steel_targets_df.reorder_levels(
    order=["company_name", "company_lei", "company_id", "sector", "year"]
)
targets_df = pd.concat([rmi_targets_df, steel_targets_df])[
    [
        "production_by_year",
        "co2_s1_by_year",
        "co2_s2_by_year",
        "co2_s3_by_year",
        "ei_s1_by_year",
        "ei_s2_by_year",
        "ei_s3_by_year",
    ]
]

In [None]:
targets_df

In [None]:
targets_df.loc["WORTHINGTON INDUSTRIES INC"]

In [None]:
targets_df.unstack(level="year")["ei_s1_by_year"].sample(15).sort_index(level=["company_name"], ascending=[1])

In [None]:
traj_df = {}
traj_mdf = {}
traj_udf = targets_df.unstack(level="year")
for scope in scopes:
    # We start by copying the target data, but we will use only the historic and replace the projection
    traj_df[scope] = traj_udf[f"ei_{scope}_by_year"].copy()
    # By calculating 2014-2019, we miss the anomoly of 2020
    historic_progress = (traj_df[scope][2019] / traj_df[scope][2014]).dropna().map(lambda x: x.m)

    # There are wierd artifacts where energy storage systems have negative generation, so treat their progress as zero
    # If intensity is actually growing, cap trajectory at 1 (no progress).
    annualized_progress = historic_progress.where(historic_progress >= 0, 0).where(historic_progress <= 1, 1) ** (
        1 / (2019 - 2014)
    )

    for year in range(2020, 2051):
        traj_df[scope].loc[:, year] = traj_df[scope][2020] * annualized_progress ** (year - 2020)
    traj_mdf[scope] = (
        traj_df[scope]
        .melt(var_name="year", value_name=f"ei_{scope}_by_year", ignore_index=False)
        .set_index("year", append=True)
        .convert_dtypes()
    )

traj_mdf = pd.concat([*traj_mdf.values()], join="outer", axis=1)
traj_mdf.loc[targets_df.index.intersection(traj_mdf.index), "production_by_year"] = targets_df["production_by_year"]
display(traj_mdf.loc["CLEVELAND-CLIFFS INC"])

In [None]:
df = traj_mdf[["ei_s1_by_year", "ei_s2_by_year", "ei_s3_by_year"]].multiply(
    traj_mdf["production_by_year"], axis="index"
)
df.rename(
    columns={f"ei_{scope}_by_year": f"co2_{scope}_by_year" for scope in scopes},
    inplace=True,
)
trajectories_df = pd.concat([df, traj_mdf], axis=1)
trajectories_df = trajectories_df[[trajectories_df.columns[-1]] + list(trajectories_df.columns[0:-1])]
trajectories_df

In [None]:
targets_df.sort_index(
    level=["company_name", "company_lei", "company_id", "sector", "year"],
    ascending=[1, 1, 1, 1, 1],
    inplace=True,
)
trajectories_df.sort_index(
    level=["company_name", "company_lei", "company_id", "sector", "year"],
    ascending=[1, 1, 1, 1, 1],
    inplace=True,
)

In [None]:
targets_df.loc[(slice(None), slice(None), slice(None), slice(None), slice(2019, 2024))]

In [None]:
trajectories_df.loc[(slice(None), slice(None), slice(None), slice(None), slice(2019, 2024))]

### TODO: Implement Units

Intensity and Production data need Units to distinguish TWh of generation vs. Tons of Steel production

Company data is converted to USD by SEC_DERA ingestion for now, but should support any currencies in the future

In [None]:
# If DF_COL contains Pint quantities (because it is a PintArray or an array of Pint Quantities),
# return a two-column dataframe of magnitudes and units.
# If DF_COL contains no Pint quanities, return it unchanged.


def dequantify_column(df_col: pd.Series) -> pd.DataFrame:
    if type(df_col.values) == PintArray:
        return pd.DataFrame(
            {
                df_col.name: df_col.values.quantity.m,
                df_col.name + "_units": str(df_col.values.dtype.units),
            },
            index=df_col.index,
        )
    elif df_col.size == 0:
        return df_col
    elif df_col.map(lambda x: isinstance(x, Quantity)).any():
        values = df_col.map(lambda x: (x.m, x.u) if isinstance(x, Quantity) else x)
        return pd.DataFrame(
            {
                df_col.name: df_col.map(lambda x: x.m if isinstance(x, Quantity) else x),
                df_col.name + "_units": df_col.map(lambda x: str(x.u) if isinstance(x, Quantity) else None),
            },
            index=df_col.index,
        )
    else:
        return df_col


# Rewrite dataframe DF so that columns containing Pint quantities are represented by a column for the Magnitude and column for the Units.
# The magnitude column retains the original column name and the units column is renamed with a _units suffix.
def dequantify_df(df: pd.DataFrame) -> pd.DataFrame:
    return pd.concat([dequantify_column(df[col]) for col in df.columns], axis=1)

In [None]:
# Because this DF comes from reading a Trino table, and because columns must be unqiue, we don't have to enumerate to ensure we properly handle columns with duplicated names


def requantify_df(df: pd.DataFrame) -> pd.DataFrame:
    units_col = None
    columns_reversed = reversed(df.columns)
    for col in columns_reversed:
        if col.endswith("_units"):
            if units_col:
                # We expect _units column to follow a non-units column
                raise ValueError
            units_col = col
            continue
        if units_col:
            if col + "_units" != units_col:
                raise ValueError
            if (df[units_col] == df[units_col][0]).all():
                # Make a PintArray
                new_col = PintArray(df[col], dtype=f"pint[{ureg(df[units_col][0]).u}]")
            else:
                # Make a pd.Series of Quantity in a way that does not throw UnitStrippedWarning
                new_col = pd.Series(data=df[col], name=col) * pd.Series(
                    data=df[units_col].map(lambda x: ureg(x).u), name=col
                )
            df = df.drop(columns=units_col)
            df[col] = new_col
            units_col = None
    return df

In [None]:
tablenames = (
    "company_data",
    "target_data",
    "trajectory_data",
    "emissions_data",
    "production_data",
)

In [None]:
schema_create = engine.execute(
    f"""
CREATE SCHEMA if not exists {ingest_catalog}.{demo_schema}
 AUTHORIZATION USER michaeltiemannosc
 WITH (
     location = 's3a://osc-datacommons-s3-bucket-dev02/data/demo_dv.db'
 )
"""
)
schema_create.fetchall()

In [None]:
targets_to_sql = dequantify_df(targets_df.drop(columns="production_by_year"))
targets_to_sql.loc[:, :, :, "Steel"]

In [None]:
financial_df[financial_df.company_market_cap.isnull()]

In [None]:
dataframes = [
    financial_df.loc[financial_df.index.intersection(targets_df.reset_index("year").index)]
    .reset_index()
    .convert_dtypes(),
    dequantify_df(targets_df.drop(columns="production_by_year")).reset_index().convert_dtypes(),
    dequantify_df(trajectories_df.drop(columns="production_by_year")).reset_index().convert_dtypes(),
    dequantify_df(targets_df[["co2_s1_by_year", "co2_s2_by_year"]]).reset_index().convert_dtypes(),
    dequantify_df(targets_df[["production_by_year"]]).reset_index().convert_dtypes(),
]

for ingest_table, df in zip(tablenames, dataframes):
    drop_table = engine.execute(f"drop table if exists {demo_schema}.{ingest_table}")
    drop_table.fetchall()

    columnschema = osc.create_table_schema_pairs(df)

    tabledef = f"""
create table if not exists {ingest_catalog}.{demo_schema}.{ingest_table}(
{columnschema}
) with (
    format = 'ORC',
    partitioning = array['year']
)
"""

    print(tabledef)
    qres = engine.execute(tabledef)
    print(qres.fetchall())
    df.to_sql(
        ingest_table,
        con=engine,
        schema=demo_schema,
        if_exists="append",
        index=False,
        method=osc.TrinoBatchInsert(batch_size=1200, verbose=True),
    )

In [None]:
pdf = targets_df.pivot(index=["company_name", "company_lei", "company_id"], columns="year").reset_index()

In [None]:
pdf

In [None]:
stop!
# pdf.insert(1, 'company_lei', pdf.company_name.str.upper().map(gleif_match))
# pdf.insert(2, 'company_id', pdf.company_lei.map(rmi_lei_dict))
# pdf = pdf.set_index(['company_name','company_lei', 'company_id'], drop=True)
pdf.columns.names=[None,None]
pdf

In [None]:
ei_s1_df = pd.concat(
    [
        pdf.company_name,
        pdf.company_lei,
        pdf.company_id,
        pdf.ei_s1_target_by_year.reset_index(),
    ],
    axis=1,
).drop("index", axis=1)
ei_s1_df

In [None]:
ei_s2_df = pd.concat(
    [
        pdf.company_name,
        pdf.company_lei,
        pdf.company_id,
        pdf.ei_s2_target_by_year.reset_index(),
    ],
    axis=1,
).drop("index", axis=1)
ei_s2_df

In [None]:
ei_s1_df.iloc[:, 3] = 2 * ei_s1_df.iloc[:, 4] - ei_s1_df.iloc[:, 5]
ei_s1_df = ei_s1_df[ei_s1_df.company_id.notna()]
ei_s1_df.insert(3, "scope", "S1")
ei_s1_df.head(10)

In [None]:
ei_s2_df.iloc[:, 3] = 2 * ei_s2_df.iloc[:, 4] - ei_s2_df.iloc[:, 5]
ei_s2_df = ei_s2_df[ei_s2_df.company_id.notna()]
ei_s2_df.insert(3, "scope", "S2")
ei_s2_df.head(10)

In [None]:
ei_s1_df.iloc[:, 3] = 2 * ei_s1_df.iloc[:, 4] - ei_s1_df.iloc[:, 5]
ei_s1_df = co2_ei_df[co2_ei_df.company_id.notna()]
ei_s1_df.insert(3, "scope", "S1")
ei_s1_df.head(10)

In [None]:
co2_df = pd.concat(
    [
        pdf.company_name,
        pdf.company_lei,
        pdf.company_id,
        pdf.co2_target_by_year.reset_index(),
    ],
    axis=1,
).drop("index", axis=1)
co2_df = co2_df[co2_df.company_id.notna()]
co2_df.insert(3, "scope", "S1+S2")
co2_df.head()

In [None]:
gen_df = pd.concat(
    [
        pdf.company_name,
        pdf.company_lei,
        pdf.company_id,
        pdf.production_by_year.reset_index(),
    ],
    axis=1,
).drop("index", axis=1)
gen_df.iloc[:, 3] = 2 * gen_df.iloc[:, 4] - gen_df.iloc[:, 5]
gen_df = gen_df[gen_df.company_id.notna()]
gen_df.insert(3, "production", "TWh")
gen_df.head()

In [None]:
with pd.ExcelWriter("rmi-20220307-output.xlsx", datetime_format="YYYY") as writer:
    financial_df.to_excel(writer, sheet_name="fundamental_data", index=False)
    co2_ei_df.to_excel(writer, sheet_name="projected_ei_in_Wh", index=False)
    gen_df.to_excel(writer, sheet_name="projected_production", index=False)
    co2_df.to_excel(writer, sheet_name="projected_co2", index=False)

In [None]:
portfolio_zero = portfolio_df.copy()
portfolio_zero.target_probability = 0.0
portfolio_one = portfolio_df.copy()
portfolio_one.target_probability = 1.0

portfolio_df.to_csv("rmi-20220307-portfolio.csv", sep=";", index=False)

In [None]:
engine.execute(
    f"select count (*) from (select parent_name from {rmi_schema}.utility_information group by parent_name)"
).fetchall()

If the following is non-NULL, the Data Vault will reject the company data

In [None]:
engine.execute(
    f"select C.company_name, C.company_id, EI.* from {demo_schema}.company_data C left join {demo_schema}.intensity_data EI on EI.company_name=C.company_name where EI.co2_intensity_target_by_year is NULL"
).fetchall()