In [1]:
import os
import json
import re
import warnings

import pandas as pd
import numpy as np
import osc_ingest_trino as osc

from ITR import data_dir as json_data_dir
from ITR_examples import data_dir as xlsx_data_dir

from ITR.data.base_providers import (
    BaseProviderProductionBenchmark,
    BaseProviderIntensityBenchmark,
)
from ITR.data.data_warehouse import DataWarehouse
from ITR.data.osc_units import Q_
from ITR.data.template import TemplateProviderCompany
from ITR.data.vault_providers import (
    VaultCompanyDataProvider,
    create_table_from_df,
)
from ITR.interfaces import (
    EScope,
    IEIBenchmarkScopes,
    IProductionBenchmarkScopes,
    ProjectionControls,
)

# isort: split

import logging

ingest_catalog = "osc_datacommons_dev"
ingest_schema = "demo_dv"
itr_prefix = "itr_"

osc.load_credentials_dotenv()

In [2]:
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

formatter = logging.Formatter(
    "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)  # LoggingConfig.FORMAT
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

logger.info("Start!")

company_data_path = os.path.join(xlsx_data_dir, "20220927 ITR V2 Sample Data.xlsx")

# Production benchmark (there's only one, and we have to stretch it from OECM to cover TPI)
benchmark_prod_json_file = "benchmark_production_OECM.json"
benchmark_prod_json = os.path.join(json_data_dir, benchmark_prod_json_file)
with open(benchmark_prod_json) as json_file:
    parsed_json = json.load(json_file)

prod_bms = IProductionBenchmarkScopes.model_validate(parsed_json)
base_production_bm = BaseProviderProductionBenchmark(production_benchmarks=prod_bms)
logger.info("Load production benchmark from {}".format(benchmark_prod_json_file))

# Emission intensities
benchmark_EI_OECM_PC_file = "benchmark_EI_OECM_PC.json"
benchmark_EI_OECM_S3_file = "benchmark_EI_OECM_S3.json"
benchmark_EI_OECM_file = "benchmark_EI_OECM.json"  # Deprecated!
benchmark_EI_TPI_15_file = "benchmark_EI_TPI_1_5_degrees.json"
benchmark_EI_TPI_file = "benchmark_EI_TPI_2_degrees.json"
benchmark_EI_TPI_below_2_file = "benchmark_EI_TPI_below_2_degrees.json"
benchmark_EI_TPI_2deg_high_efficiency_file = (
    "benchmark_EI_TPI_2_degrees_high_efficiency.json"
)
benchmark_EI_TPI_2deg_shift_improve_file = (
    "benchmark_EI_TPI_2_degrees_shift_improve.json"
)

2023-12-01 15:08:09,521 - __main__ - INFO - Start!
2023-12-01 15:08:09,714 - __main__ - INFO - Load production benchmark from benchmark_production_OECM.json


In [3]:
# loading sample portfolio
df_portfolio = pd.read_excel(company_data_path, sheet_name="Portfolio")

for i, col in enumerate(df_portfolio.columns):
    if col.startswith("investment_value"):
        if match := re.match(r".*\[([A-Z]{3})\]", col, re.I):
            df_portfolio.rename(columns={col: "investment_value"}, inplace=True)
            df_portfolio["investment_value"] = df_portfolio["investment_value"].astype(
                f"pint[{match.group(1)}]"
            )

In [4]:
eibm = "OECM_S3"

if eibm == "OECM_PC":
    benchmark_file = benchmark_EI_OECM_PC_file
elif eibm == "OECM_S3":
    benchmark_file = benchmark_EI_OECM_S3_file
elif eibm.startswith("TPI_2_degrees"):
    benchmark_file = benchmark_EI_TPI_file
elif eibm == "TPI_15_degrees":
    benchmark_file = benchmark_EI_TPI_15_file
elif eibm == "OECM":
    benchmark_file = benchmark_EI_OECM_file
    logger.info(
        "OECM benchmark is for backward compatibility only.  Use OECM_PC instead."
    )
else:
    benchmark_file = benchmark_EI_TPI_below_2_file
# load intensity benchmarks
benchmark_EI = os.path.join(json_data_dir, benchmark_file)
with open(benchmark_EI) as json_file:
    parsed_json = json.load(json_file)
if eibm.startswith("TPI_2_degrees"):
    extra_EI = os.path.join(
        json_data_dir,
        (
            benchmark_EI_TPI_2deg_high_efficiency_file
            if "_high_efficiency" in eibm
            else benchmark_EI_TPI_2deg_shift_improve_file
        ),
    )
    with open(extra_EI) as json_file:
        extra_json = json.load(json_file)
        for scope_name in EScope.get_scopes():
            if scope_name in extra_json:
                if scope_name not in parsed_json:
                    parsed_json[scope_name] = extra_json[scope_name]
                else:
                    parsed_json[scope_name]["benchmarks"] += extra_json[scope_name][
                        "benchmarks"
                    ]
EI_bm = BaseProviderIntensityBenchmark(
    EI_benchmarks=IEIBenchmarkScopes.model_validate(parsed_json)
)

In [5]:
# We fill a conventional DataWarehouse with template data
# After the ingestion process is complete, downstream users can access DataVaultWarehouse

template_company_data = TemplateProviderCompany(
    company_data_path, projection_controls=ProjectionControls()
)
Warehouse = DataWarehouse(
    template_company_data,
    benchmark_projected_production=base_production_bm,
    benchmarks_projected_ei=EI_bm,
    estimate_missing_data=DataWarehouse.estimate_missing_s3_data,
)

            ('Energy', 'PETRONAS_SOE', 'S2')],
           names=['sector', 'company_id', 'metric'])
            ( 'Electricity Utilities', 'CA2908761018'),
            ( 'Electricity Utilities', 'CA3495531079'),
            ( 'Electricity Utilities', 'CA87807B1076'),
            ( 'Electricity Utilities', 'US0185223007'),
            ( 'Electricity Utilities', 'US0188021085'),
            ( 'Electricity Utilities', 'US0236081024'),
            ( 'Electricity Utilities', 'US0255371017'),
            ( 'Electricity Utilities', 'US0921131092'),
            ( 'Electricity Utilities', 'US1258961002'),
            ( 'Electricity Utilities', 'US18551QAA58'),
            ( 'Electricity Utilities', 'US2091151041'),
            ( 'Electricity Utilities', 'US2333311072'),
            ( 'Electricity Utilities', 'US25746U1097'),
            ( 'Electricity Utilities', 'US26441C2044'),
            ( 'Electricity Utilities', 'US29364G1031'),
            ( 'Electricity Utilities', 'US30034W1062'),
    

In [6]:
# This will have identity of main notebook user, not OS-Climate-User1
ingest_engine = osc.attach_trino_engine(
    verbose=True, catalog=ingest_catalog, schema=ingest_schema
)

using connect string: trino://MichaelTiemannOSC@trino-secure-odh-trino.apps.odh-cl2.apps.os-climate.org:443/osc_datacommons_dev/demo_dv


In [7]:
# bucket must be configured with credentials for trino, and accessible to the hive catalog
# You may need to use a different prefix here depending on how you name your credentials.env variables
hive_bucket = osc.attach_s3_bucket("S3_OSCCL2")

hive_catalog = "osc_datacommons_hive_ingest"
hive_schema = "ingest"

## ITR Company Data

In [8]:
fundamental_cols = [
    "company_name",
    "company_lei",
    "company_id",
    "sector",
    "country",
    "region",
    "exposure",
    "currency",
    "report_date",
    "company_market_cap",
    "company_revenue",
    "company_enterprise_value",
    "company_ev_plus_cash",
    "company_total_assets",
    "cash",
    "debt",
]

for col in ["cash", "debt"]:
    if col not in template_company_data.df_fundamentals.columns:
        template_company_data.df_fundamentals[col] = (
            np.nan * template_company_data.df_fundamentals["company_revenue"]
        )

df = template_company_data.df_fundamentals[fundamental_cols].copy()

df.rename(
    columns={
        "company_enterprise_value": "company_ev",
        "company_ev_plus_cash": "company_evic",
        "cash": "company_cash_equivalents",
        "debt": "company_debt",
    },
    inplace=True,
)
df["year"] = df.report_date.dt.year
df.drop(columns="report_date", inplace=True)

company_tablename = f"{itr_prefix}company_data"

# ingest company data
create_table_from_df(
    df,
    ingest_schema,
    company_tablename,
    ingest_engine,
    hive_bucket,
    hive_catalog,
    hive_schema,
    verbose=True,
)

drop table if exists demo_dv.itr_company_data
enforcing dataframe partition column order

verifying existence of table demo_dv.itr_company_data
create table if not exists demo_dv.itr_company_data (
    company_name varchar,
    company_lei varchar,
    company_id varchar,
    sector varchar,
    country varchar,
    region varchar,
    exposure varchar,
    currency varchar,
    company_market_cap double,
    company_market_cap_units varchar,
    company_revenue double,
    company_revenue_units varchar,
    company_ev double,
    company_ev_units varchar,
    company_evic double,
    company_evic_units varchar,
    company_total_assets double,
    company_total_assets_units varchar,
    company_cash_equivalents bigint,
    company_cash_equivalents_units varchar,
    company_debt bigint,
    company_debt_units varchar,
    year integer
) with (
    format = 'parquet',
    partitioning = array['year']
)

staging dataframe parquet to s3 osc-datacommons-s3-bucket-dev02
/tmp/ingest_temp_7b

In [9]:
company_info_at_base_year = (
    template_company_data.get_company_intensity_and_production_at_base_year(
        [
            company_id
            for company_id in template_company_data.df_fundamentals.company_id.values
            if company_id != "US6745991058-chem"
        ]
    )
)


with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    # See https://github.com/hgrecco/pint-pandas/issues/128
    projected_production = (
        Warehouse.benchmark_projected_production.get_company_projected_production(
            company_info_at_base_year
        )
    )

In [10]:
type(projected_production.iloc[0, 0])

pint.Quantity

In [16]:
company_data = VaultCompanyDataProvider(ingest_engine, company_tablename)

production_tablename = f"{itr_prefix}production_data"

df = projected_production.droplevel("scope").drop_duplicates()
df.columns.set_names("year", inplace=True)
df2 = df.unstack(level=0).to_frame("production_by_year").reset_index("year")
df3 = pd.read_sql(
    f"select distinct company_id, company_name, company_lei, sector from {company_data._company_table}",
    ingest_engine,
)
df4 = df2.merge(df3, on="company_id").reset_index()
production_df = df4[
    # Reorder columns
    [
        "company_name",
        "company_lei",
        "company_id",
        "sector",
        "year",
        "production_by_year",
    ]
]

# Ingest productions into Data Vault
create_table_from_df(
    production_df,
    ingest_schema,
    company_data._production_table,
    ingest_engine,
    hive_bucket,
    hive_catalog,
    hive_schema,
    verbose=True,
)

drop table if exists demo_dv.itr_production_data


  return _isna_array(np.asarray(obj), inf_as_na=inf_as_na)


enforcing dataframe partition column order

verifying existence of table demo_dv.itr_production_data
create table if not exists demo_dv.itr_production_data (
    company_name varchar,
    company_lei varchar,
    company_id varchar,
    sector varchar,
    production_by_year double,
    production_by_year_units varchar,
    year bigint
) with (
    format = 'parquet',
    partitioning = array['year']
)

staging dataframe parquet to s3 osc-datacommons-s3-bucket-dev02
/tmp/ingest_temp_21613342/_common_metadata  -->  trino/ingest/ingest_temp_21613342/_common_metadata
/tmp/ingest_temp_21613342/_metadata  -->  trino/ingest/ingest_temp_21613342/_metadata
/tmp/ingest_temp_21613342/year=2032/part.0.parquet  -->  trino/ingest/ingest_temp_21613342/year=2032/part.0.parquet
/tmp/ingest_temp_21613342/year=2035/part.0.parquet  -->  trino/ingest/ingest_temp_21613342/year=2035/part.0.parquet
/tmp/ingest_temp_21613342/year=2050/part.0.parquet  -->  trino/ingest/ingest_temp_21613342/year=2050/part.0.par

In [15]:
target_tablename = f"{itr_prefix}target_data"
trajectory_tablename = f"{itr_prefix}trajectory_data"

target_dfs = []
trajectory_dfs = []
projection_tablename = [target_tablename, trajectory_tablename]

for i, projection in enumerate(["projected_targets", "projected_intensities"]):
    projection_dfs = []
    for company in template_company_data._companies:
        ei_dict = {}
        for scope in EScope.get_scopes():
            if getattr(company, projection)[scope]:
                ei_dict[scope] = getattr(company, projection)[scope].projections
            else:
                ei_dict[scope] = pd.Series(dtype="object")
        ei_data = pd.concat(
            [ei_dict[scope] for scope in EScope.get_scopes()], axis=1
        ).reset_index()
        ei_data.columns = ["year"] + [
            f"ei_{scope.lower()}_by_year" for scope in EScope.get_scopes()
        ]
        df = pd.DataFrame(
            data=[
                [company.company_name, "", company.company_id, company.sector]
                for i in ei_data.index
            ],
            columns=["company_name", "company_lei", "company_id", "sector"],
        )
        projection_dfs.append(pd.concat([df, ei_data], axis=1))
    df2 = pd.concat(projection_dfs).reset_index(drop=True)
    if projection_tablename[i] == target_tablename:
        target_df = df2
    create_table_from_df(
        df2,
        ingest_schema,
        projection_tablename[i],
        ingest_engine,
        hive_bucket,
        hive_catalog,
        hive_schema,
        verbose=True,
    )

drop table if exists demo_dv.itr_target_data


  return _isna_array(np.asarray(obj), inf_as_na=inf_as_na)


enforcing dataframe partition column order

verifying existence of table demo_dv.itr_target_data
create table if not exists demo_dv.itr_target_data (
    company_name varchar,
    company_lei varchar,
    company_id varchar,
    sector varchar,
    ei_s1_by_year double,
    ei_s1_by_year_units varchar,
    ei_s2_by_year double,
    ei_s2_by_year_units varchar,
    ei_s3_by_year double,
    ei_s3_by_year_units varchar,
    ei_s1s2_by_year double,
    ei_s1s2_by_year_units varchar,
    ei_s1s2s3_by_year double,
    ei_s1s2s3_by_year_units varchar,
    year bigint
) with (
    format = 'parquet',
    partitioning = array['year']
)

staging dataframe parquet to s3 osc-datacommons-s3-bucket-dev02
/tmp/ingest_temp_bc56b732/_common_metadata  -->  trino/ingest/ingest_temp_bc56b732/_common_metadata
/tmp/ingest_temp_bc56b732/_metadata  -->  trino/ingest/ingest_temp_bc56b732/_metadata
/tmp/ingest_temp_bc56b732/year=2032/part.0.parquet  -->  trino/ingest/ingest_temp_bc56b732/year=2032/part.0.parqu

  return _isna_array(np.asarray(obj), inf_as_na=inf_as_na)


enforcing dataframe partition column order

verifying existence of table demo_dv.itr_trajectory_data
create table if not exists demo_dv.itr_trajectory_data (
    company_name varchar,
    company_lei varchar,
    company_id varchar,
    sector varchar,
    ei_s1_by_year double,
    ei_s1_by_year_units varchar,
    ei_s2_by_year double,
    ei_s2_by_year_units varchar,
    ei_s3_by_year double,
    ei_s3_by_year_units varchar,
    ei_s1s2_by_year double,
    ei_s1s2_by_year_units varchar,
    ei_s1s2s3_by_year double,
    ei_s1s2s3_by_year_units varchar,
    year bigint
) with (
    format = 'parquet',
    partitioning = array['year']
)

staging dataframe parquet to s3 osc-datacommons-s3-bucket-dev02
/tmp/ingest_temp_993003f0/_common_metadata  -->  trino/ingest/ingest_temp_993003f0/_common_metadata
/tmp/ingest_temp_993003f0/_metadata  -->  trino/ingest/ingest_temp_993003f0/_metadata
/tmp/ingest_temp_993003f0/year=2032/part.0.parquet  -->  trino/ingest/ingest_temp_993003f0/year=2032/part

In [13]:
osc._do_sql(f"select count (*) from {target_tablename}", ingest_engine, verbose=True)
osc._do_sql(
    f"select count (*) from {trajectory_tablename}", ingest_engine, verbose=True
)

select count (*) from itr_target_data
[(76,)]
select count (*) from itr_trajectory_data
[(76,)]


[(76,)]

In [17]:
emissions_tablename = f"{itr_prefix}emissions_data"

# Create emissions_data table using production_df and math
emissions_df = production_df.merge(
    target_df.drop(columns=["company_name", "company_lei", "sector"]),
    on=["company_id", "year"],
)
emissions_df = emissions_df[
    ~emissions_df.company_id.isin(
        [
            "DE000SYM9999",
            "NO0010657505",
            "GB0000961622",
            "DE000BASF111",
            "IE00BZ12WP82",
            "FR0004024222",
        ]
    )
]
print(emissions_df.index.names)
for scope in EScope.get_scopes():
    mask = emissions_df[f"ei_{scope.lower()}_by_year"].isna()
    emissions_df.loc[mask, f"ei_{scope.lower()}_by_year"] = emissions_df[
        "production_by_year"
    ].map(lambda x: Q_(np.nan, f"Mt CO2 / ({str(x.u)})"))
    emissions_df[f"co2_{scope.lower()}_by_year"] = (
        emissions_df["production_by_year"]
        .mul(emissions_df[f"ei_{scope.lower()}_by_year"])
        .astype("pint[Mt CO2e]")
    )
    emissions_df = emissions_df.drop(columns=f"ei_{scope.lower()}_by_year")
emissions_df = emissions_df.drop(columns="production_by_year")
create_table_from_df(
    emissions_df,
    ingest_schema,
    emissions_tablename,
    ingest_engine,
    hive_bucket,
    hive_catalog,
    hive_schema,
    verbose=True,
)

[None]
drop table if exists demo_dv.itr_emissions_data
enforcing dataframe partition column order

verifying existence of table demo_dv.itr_emissions_data
create table if not exists demo_dv.itr_emissions_data (
    company_name varchar,
    company_lei varchar,
    company_id varchar,
    sector varchar,
    co2_s1_by_year double,
    co2_s1_by_year_units varchar,
    co2_s2_by_year double,
    co2_s2_by_year_units varchar,
    co2_s3_by_year double,
    co2_s3_by_year_units varchar,
    co2_s1s2_by_year double,
    co2_s1s2_by_year_units varchar,
    co2_s1s2s3_by_year double,
    co2_s1s2s3_by_year_units varchar,
    year bigint
) with (
    format = 'parquet',
    partitioning = array['year']
)

staging dataframe parquet to s3 osc-datacommons-s3-bucket-dev02
/tmp/ingest_temp_e565ca02/_common_metadata  -->  trino/ingest/ingest_temp_e565ca02/_common_metadata
/tmp/ingest_temp_e565ca02/_metadata  -->  trino/ingest/ingest_temp_e565ca02/_metadata
/tmp/ingest_temp_e565ca02/year=2032/part.0

In [18]:
emissions_df

Unnamed: 0,company_name,company_lei,company_id,sector,year,co2_s1_by_year,co2_s2_by_year,co2_s3_by_year,co2_s1s2_by_year,co2_s1s2s3_by_year
0,SK Innovation,988400PXP70BWVSJVF07,096770.SK,Energy,2019,10.117595999999999,2.400554,151.79999999999998,12.518149999999999,164.31815
1,SK Innovation,988400PXP70BWVSJVF07,096770.SK,Energy,2020,11.099542743700853,2.733049295351212,131.46999999999997,12.093237999999998,164.21174403905204
2,SK Innovation,988400PXP70BWVSJVF07,096770.SK,Energy,2021,8.963148159254997,2.501464925055745,118.35,10.899999999999999,135.94506799515258
3,SK Innovation,988400PXP70BWVSJVF07,096770.SK,Energy,2022,8.144233428818712,2.2729195034693954,117.96502886698957,10.551102884701514,128.5161317516911
4,SK Innovation,988400PXP70BWVSJVF07,096770.SK,Energy,2023,7.395703336263191,2.0640172585653014,117.58130997540924,10.213373585647398,127.79468356105663
...,...,...,...,...,...,...,...,...,...,...
2427,"Xcel Energy, Inc.",LGJNMI9GH8XIDG5RCM61,US98389B1008,Electricity Utilities,2046,3.769433931712026,0.0,,2.1158195963480084,
2428,"Xcel Energy, Inc.",LGJNMI9GH8XIDG5RCM61,US98389B1008,Electricity Utilities,2047,2.7422371696791807,0.0,,1.5127042549837622,
2429,"Xcel Energy, Inc.",LGJNMI9GH8XIDG5RCM61,US98389B1008,Electricity Utilities,2048,1.7745981112853353,0.0,,0.9629449574412783,
2430,"Xcel Energy, Inc.",LGJNMI9GH8XIDG5RCM61,US98389B1008,Electricity Utilities,2049,0.8619305048286594,0.0,,0.4604972513316361,
