<font size="5">PCAF Data Pipeline to ingest the data provided by EDGAR (fossil CO2 values)</font>


In [1]:
# 'capture' magic prevents long outputs from spamming your notebook
#%%capture pipoutput

# For loading predefined environment variables from files
# Typically used to load sensitive access credentials
%pip install python-dotenv

# Standard python package for interacting with S3 buckets
%pip install boto3

# Interacting with Trino and using Trino with sqlalchemy
%pip install trino sqlalchemy sqlalchemy-trino

# Pandas and parquet file i/o
%pip install pandas pyarrow fastparquet

# OS-Climate utilities to make data ingest easier
%pip install osc-ingest-tools
%pip install country_converter --upgrade
%pip install pint-pandas
%pip install openscm-units
%pip install pint

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


Load Environment Variables

In [2]:
from dotenv import dotenv_values, load_dotenv
import osc_ingest_trino as osc
import os
import pathlib
dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src'))
dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env'
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path,override=True)

In [3]:
# use a catalog that is configured for iceberg
ingest_catalog = 'osc_datacommons_dev'
ingest_schema = 'pcaf_sovereign_footprint'
ingest_table = 'sf_edgar_fossil_CO2'

In [4]:
import trino
from sqlalchemy.engine import create_engine

env_var_prefix = 'TRINO'

sqlstring = 'trino://{user}@{host}:{port}/'.format(
    user = os.environ[f'{env_var_prefix}_USER'],
    host = os.environ[f'{env_var_prefix}_HOST'],
    port = os.environ[f'{env_var_prefix}_PORT']
)
sqlargs = {
    'auth': trino.auth.JWTAuthentication(os.environ[f'{env_var_prefix}_PASSWD']),
    'http_scheme': 'https',
    'catalog': 'osc_datacommons_dev'
}
engine = create_engine(sqlstring, connect_args = sqlargs)
connection = engine.connect()

trino_bucket = osc.attach_s3_bucket("S3_DEV")

  res = connection.execute(sql.text(query))


In [5]:
import boto3

s3_source = boto3.resource(
    service_name="s3",
    endpoint_url=os.environ['S3_LANDING_ENDPOINT'],
    aws_access_key_id=os.environ['S3_LANDING_ACCESS_KEY'],
    aws_secret_access_key=os.environ['S3_LANDING_SECRET_KEY'],
)
source_bucket = s3_source.Bucket(os.environ['S3_LANDING_BUCKET'])

Open a Trino connection using JWT for authentication

In [6]:
# Show available schemas to ensure trino connection is set correctly
schema_read = engine.execute(f'show schemas in {ingest_catalog}')
for row in schema_read.fetchall():
    print(row)

('default',)
('demo_dv',)
('iceberg_demo',)
('information_schema',)
('pcaf_sovereign_footprint',)
('sandbox',)


In [7]:
pip install country_converter --upgrade

Note: you may need to restart the kernel to use updated packages.


<font size="3">Parse/Load EDGAR Fossil CO2 GHG values</font>

In [8]:
import pandas as pd
import ParseXLS as parser

ticker_file = s3_source.Object(os.environ['S3_LANDING_BUCKET'],'PCAF-sovereign-footprint/EDGAR/EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls')
ticker_file.download_file(f'/tmp/EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls')

df = parser.process('Edgar.ini','EDGAR.csv') 
#pd.to_numeric(df["validity_date"],errors='raise')
df=df.astype({'validity_date': 'int32'}).astype({'country_iso_code': 'str'})

df[['attribute','sector']] = df['attribute'].str.split(':', expand=True)

df= df.convert_dtypes()
df.info(verbose=True)
df= df[['rec_source','data_provider','country_iso_code','country_name','validity_date','attribute','sector','value','value_units']].dropna(subset=['value'])
#df


Edgar.ini
file_list:
['/tmp/EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls']
/tmp/EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls
2
xls
/tmp/EDGARv6.0_FT2020_fossil_CO2_GHG_booklet202
<configparser.ConfigParser object at 0x7fa394bd87f0>
['']
eval_components
['$sheet_name']
['Country', 1970, 1971, 1972, 1973, 1974, 1975, 1976, 1977, 1978, 1979, 1980, 1981, 1982, 1983, 1984, 1985, 1986, 1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020]
$sheet_name
['']
eval_components
['"fossil_CO2_by_sector_and_country:"', 'Sector']
['Sector', 'Country', 1970, 1971, 1972, 1973, 1974, 1975, 1976, 1977, 1978, 1979, 1980, 1981, 1982, 1983, 1984, 1985, 1986, 1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 20

More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular expression match for France and Monaco
More then one regular ex

210


More then one regular expression match for Italy, San Marino and the Holy See
More then one regular expression match for Italy, San Marino and the Holy See
More then one regular expression match for Italy, San Marino and the Holy See
More then one regular expression match for Italy, San Marino and the Holy See
More then one regular expression match for Italy, San Marino and the Holy See
More then one regular expression match for Italy, San Marino and the Holy See
More then one regular expression match for Italy, San Marino and the Holy See
More then one regular expression match for Italy, San Marino and the Holy See
More then one regular expression match for Italy, San Marino and the Holy See
More then one regular expression match for Italy, San Marino and the Holy See
More then one regular expression match for Italy, San Marino and the Holy See
More then one regular expression match for Italy, San Marino and the Holy See
More then one regular expression match for Italy, San Marino and

1036


More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Italy, San Marino and the Holy See
More then one regular expression m

208


More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Italy, San Marino and the Holy See
More then one regular expression m

208


More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match

212


More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match for Israel and Palestine, State of
More then one regular expression match

1874
list columns
['country_iso_code', 'country_name', 'attribute', 'units', 'year', 'value']
['country_iso_code', 'country_name', 'attribute', 'value_units', 'value', 'rec_source', 'data_provider', 'validity_date']
<class 'pandas.core.frame.DataFrame'>
Int64Index: 94707 entries, 0 to 95571
Data columns (total 9 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   rec_source        94707 non-null  string 
 1   data_provider     94707 non-null  string 
 2   country_name      94707 non-null  string 
 3   country_iso_code  94707 non-null  string 
 4   validity_date     94707 non-null  Int32  
 5   attribute         94707 non-null  string 
 6   value             89414 non-null  Float64
 7   value_units       94707 non-null  string 
 8   sector            52479 non-null  string 
dtypes: Float64(1), Int32(1), string(7)
memory usage: 7.0 MB


In [9]:
#df1=df[df["attribute"]=="fossil_CO2_totals_by_country"]
#df2=df[df["attribute"]=="fossil_CO2_by_sector_and_countr(Power Industry)"]
#df3=df[df["attribute"]=="fossil_CO2_by_sector_and_countr(Buildings)"]
df1=df.head(1)
df1= df1.convert_dtypes()


In [10]:
df["attribute"].unique()

<StringArray>
[    'fossil_CO2_totals_by_country', 'fossil_CO2_by_sector_and_country',
  'fossil_CO2_per_capita_by_countr',            'GHG_totals_by_country',
    'fossil_CO2_per_GDP_by_country']
Length: 5, dtype: string

In [11]:
from dotenv import dotenv_values, load_dotenv
import osc_ingest_trino as osc
import os
import pathlib

In [12]:
import osc_ingest_trino as osc
columnschema = osc.create_table_schema_pairs(df) 
sql = f"""
drop table if exists {ingest_catalog}.{ingest_schema}.{ingest_table}
"""
print(sql)
qres = engine.execute(sql)
print(qres.fetchall())



drop table if exists osc_datacommons_dev.pcaf_sovereign_footprint.sf_edgar_fossil_CO2

[(True,)]


In [13]:
print(ingest_catalog)
#df=df.drop(df[df.country_name=="cote d'ivoire"].index)
df.to_sql(ingest_table,
           con=engine,
           schema=ingest_schema,
           if_exists='append',
           index=False,
           method=osc.TrinoBatchInsert(batch_size = 5000, verbose = True))

osc_datacommons_dev
constructed fully qualified table name as: "pcaf_sovereign_footprint.sf_edgar_fossil_CO2"
inserting 5000 records
  ('EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls', 'EDGAR', 'AFG', 'Afghanistan', 1970, 'fossil_CO2_totals_by_country', NULL, 1.7182055194, 'Mt CO2/yr')
  ('EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls', 'EDGAR', 'ALB', 'Albania', 1970, 'fossil_CO2_totals_by_country', NULL, 4.813550215686, 'Mt CO2/yr')
  ('EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls', 'EDGAR', 'DZA', 'Algeria', 1970, 'fossil_CO2_totals_by_country', NULL, 18.93157288567, 'Mt CO2/yr')
  ...
  ('EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls', 'EDGAR', 'KGZ', 'Kyrgyzstan', 1973, 'fossil_CO2_totals_by_country', NULL, 16.641630259, 'Mt CO2/yr')
batch insert result: [(5000,)]
inserting 5000 records
  ('EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls', 'EDGAR', 'LAO', 'Laos', 1973, 'fossil_CO2_totals_by_country', NULL, 0.337056733992, 'Mt CO2/yr')
  ('EDGARv6.0_FT2020_fossil_CO2_GHG_b



In [14]:
sql=f"""
select * from {ingest_catalog}.{ingest_schema}.{ingest_table}"""
pd.read_sql(sql, engine)


Unnamed: 0,rec_source,data_provider,country_iso_code,country_name,validity_date,attribute,sector,value,value_units
0,EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls,EDGAR,GNQ,Equatorial Guinea,1985,fossil_CO2_by_sector_and_country,Buildings,0.016048,Mt CO2/yr
1,EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls,EDGAR,ERI,Eritrea,1985,fossil_CO2_by_sector_and_country,Buildings,0.002594,Mt CO2/yr
2,EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls,EDGAR,EST,Estonia,1985,fossil_CO2_by_sector_and_country,Buildings,1.753495,Mt CO2/yr
3,EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls,EDGAR,SWZ,Eswatini,1985,fossil_CO2_by_sector_and_country,Buildings,0.078650,Mt CO2/yr
4,EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls,EDGAR,ETH,Ethiopia,1985,fossil_CO2_by_sector_and_country,Buildings,0.169601,Mt CO2/yr
...,...,...,...,...,...,...,...,...,...
89409,EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls,EDGAR,ARE,United Arab Emirates,1976,fossil_CO2_totals_by_country,,45.538784,Mt CO2/yr
89410,EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls,EDGAR,GBR,United Kingdom,1976,fossil_CO2_totals_by_country,,629.764726,Mt CO2/yr
89411,EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls,EDGAR,USA,United States,1976,fossil_CO2_totals_by_country,,4909.706902,Mt CO2/yr
89412,EDGARv6.0_FT2020_fossil_CO2_GHG_booklet2021.xls,EDGAR,URY,Uruguay,1976,fossil_CO2_totals_by_country,,5.891610,Mt CO2/yr
