## Load Earth System Science Data (ESSD) datasets

Copyright (C) 2021 OS-Climate

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Contributed by Michael Tiemann (Github: MichaelTiemannOSC)

In [1]:
import os
import pandas as pd
import numpy as np

import pint
import pint_pandas
import iam_units
from openscm_units import unit_registry
pint_pandas.PintType.ureg = unit_registry
ureg = unit_registry

import osc_ingest_trino as osc
import trino
from sqlalchemy.engine import create_engine

import python_pachyderm

import sys
import io

from pandas_profiling import ProfileReport

Load Credentials

In [2]:
# From the AWS Account page, copy the export scripts from the appropriate role using the "Command Line or Programmatic Access" link
# Paste the copied text into ~/credentials.env

# Load environment variables from credentials.env
osc.load_credentials_dotenv()

In [3]:
env_var_prefix = 'TRINO'

sqlstring = 'trino://{user}@{host}:{port}/'.format(
    user = os.environ[f'{env_var_prefix}_USER'],
    host = os.environ[f'{env_var_prefix}_HOST'],
    port = os.environ[f'{env_var_prefix}_PORT']
)
sqlargs = {
    'auth': trino.auth.JWTAuthentication(os.environ[f'{env_var_prefix}_PASSWD']),
    'http_scheme': 'https',
    'catalog': 'osc_datacommons_dev'
}
engine = create_engine(sqlstring, connect_args = sqlargs)
connection = engine.connect()

In [4]:
ingest_catalog = 'osc_datacommons_dev'
ingest_schema = 'mdt_sandbox'
essd_table_prefix = 'essd_'

In [5]:
# Show available schemas to ensure trino connection is set correctly
schema_read = engine.execute(f'show schemas in {ingest_catalog}')
for row in schema_read.fetchall():
    print(row)

('default',)
('demo_dv',)
('iceberg_demo',)
('information_schema',)
('ingest',)
('mdt_sandbox',)
('pcaf_sovereign_footprint',)
('sandbox',)


In [6]:
# Create a pachyderm client by manually setting up host and port
client = python_pachyderm.Client(os.environ['PACH_ENDPOINT'], os.environ['PACH_PORT'])

In [7]:
# Display list of repos
print(list(client.list_repo()))

[repo {
  name: "essd"
  type: "user"
}
created {
  seconds: 1660385394
  nanos: 931518000
}
size_bytes_upper_bound: 45686859
branches {
  repo {
    name: "essd"
    type: "user"
  }
  name: "master"
}
, repo {
  name: "wri-gppd"
  type: "user"
}
created {
  seconds: 1659790074
  nanos: 399079000
}
size_bytes_upper_bound: 27086020
branches {
  repo {
    name: "wri-gppd"
    type: "user"
  }
  name: "master"
}
]


In [8]:
# Check the file commit
print(list(client.list_file(("essd","master"), "/ESSD/")))

[file {
  commit {
    branch {
      repo {
        name: "essd"
        type: "user"
      }
      name: "master"
    }
    id: "867e73d0fe764f2791204b7b1302d6ff"
  }
  path: "/ESSD/essd-13-5213-2021-supplement.pdf"
  datum: "default"
}
file_type: FILE
committed {
  seconds: 1660385399
  nanos: 278217000
}
size_bytes: 1108915
hash: "Bw\332P\022\003\014B\364\357e\317\032\'\016\350\353\323\362(\314\212\376\007\272\215\361|\272X\257\377"
, file {
  commit {
    branch {
      repo {
        name: "essd"
        type: "user"
      }
      name: "master"
    }
    id: "867e73d0fe764f2791204b7b1302d6ff"
  }
  path: "/ESSD/essd-13-5213-2021.pdf"
  datum: "default"
}
file_type: FILE
committed {
  seconds: 1660385399
  nanos: 278217000
}
size_bytes: 7243436
hash: "3\000\020\314\237\"\321R\202#\006!\031\241ll\377\340\207\350\273\327\034\206s\254\354\002\013r\354\246"
, file {
  commit {
    branch {
      repo {
        name: "essd"
        type: "user"
      }
      name: "master"
    }
    i

Initialize DBT disctionary we will write out as YML at the end

In [9]:
dbt_dict = {}
dbt_dict['models'] = {}

The following text describes DBT model properties

The following text describes DBT external properties

In [10]:
def create_trino_table_and_dbt_metadata(tablename, df, partitioning='', custom_meta_content='', custom_meta_fields='', verbose=False):
    ingest_table = f'{essd_table_prefix}{tablename}'

    if custom_meta_content:
        dbt_models = dbt_dict['models']
        dbt_models[ingest_table] = dbt_table = { 'description': custom_meta_content['description']}
        if custom_meta_fields:
            dbt_table['columns'] = dbt_columns = { name: {'description': custom_meta_fields[name]['Description'] } for name in custom_meta_fields.keys() }
            for name in custom_meta_fields.keys():
                if 'tags' in custom_meta_fields[name].keys():
                    dbt_columns[name]['tags'] = custom_meta_fields[name]['tags']
    elif custom_meta_field:
        raise VALUE_ERROR

    columnschema = osc.create_table_schema_pairs(df, typemap={"datetime64[ns]":"timestamp(6)"})

    drop_table = engine.execute(f"drop table if exists {ingest_schema}.{ingest_table}")
    drop_table.fetchall()

    tabledef = f"""
create table if not exists {ingest_catalog}.{ingest_schema}.{ingest_table}(
{columnschema}
) with (
{'partitioning = ' if partitioning else ''}{partitioning}{',' if partitioning else ''}
format = 'ORC'
)
"""
    print(tabledef)
    qres = engine.execute(tabledef)
    print(qres.fetchall())
    df.to_sql(ingest_table,
              con=engine, schema=ingest_schema, if_exists='append',
              index=False,
              method=osc.TrinoBatchInsert(batch_size = 10000, verbose = verbose))
    
    with open(f"/opt/app-root/src/essd-ingest-pipeline/dbt/essd_transform/models/{ingest_table}.sql", "w", encoding="utf-8") as f:
        print("{{ config(materialized='table') }}" + f"""

with source_data as (
    select {', '.join(df.columns)}
    from {ingest_catalog}.{ingest_schema}.{ingest_table}
)

select * from source_data
""", file=f)

Load ESSD data file using pandas *read_excel* and using *ingest_uuid* as the global UUID for this ingestion

In [11]:
# Use the pachyderm client to check out the source XLSX file
xlsx_file_name = "essd_ghg_data.xlsx"
f = client.get_file(("essd", "master"), "/ESSD/" + xlsx_file_name)
essd_bytes = io.BytesIO(f.read())

In [12]:
# It takes ~90 seconds (!) to load nearly 600K rows of data
df = pd.read_excel(essd_bytes, sheet_name='data',
                   # nrows=100,
                   converters={'year': lambda x: pd.to_datetime(x, format='%Y')},
                   dtype={'gwp100_ar5':'int32', 'value':'float64'},
                   engine='openpyxl')
df['value_units'] = 't CO2e'

In [13]:
# Put year at the end to make for more friendly partitioning
essd_data_df = osc.enforce_partition_column_order(df, ['year'], inplace=False).convert_dtypes()

display(essd_data_df.columns)

Index(['ISO', 'country', 'region_ar6_6', 'region_ar6_10', 'region_ar6_22',
       'region_ar6_dev', 'sector_title', 'subsector_title', 'gas',
       'gwp100_ar5', 'value', 'value_units', 'year'],
      dtype='object')

In [14]:
# Generate profile for the data set and render it as an HTML report
# Note that minimal=True is required due to incompatibility between pandas 1.4.x and pandas-profiling
# This disables expensive computations such as correlations and duplicate row detection
# Refer to GitHub issue at https://github.com/ydataai/pandas-profiling/issues/911 for information

profile = ProfileReport(essd_data_df, title="Profiling Report for " + xlsx_file_name, minimal=True)
profile.to_file("../reports/profile_df_essd_ghg_data.html")

# The code below allows generating the profile into a HTML version and display it within the notebook
# profile.to_notebook_iframe()

# The code below allows generating the profile into an interactive widget (not working)
# profile.to_widgets()

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

Construct the combined metadata by merging existing table metadata and custom metadata.
Note: The metadata content must be JSON serialisable and encoded as bytes; the metadata key must also be encoded as bytes.

In [15]:
essd_content_df = pd.read_excel(essd_bytes, sheet_name='info', header=None).dropna(axis=0).set_index(0).T
custom_meta_content = {
    'title': 'Earth System Science Data (ESSD) Dataset',
    'author': essd_content_df[['Author & contact']].squeeze(),
    'contact': essd_content_df[['Author & contact']].squeeze(),
    'description': essd_content_df[['Data description']].squeeze(),
    'release_date': essd_content_df[['Last date of compilation']].squeeze(),
    # How should we describe our transformative step here?
}

In [16]:
essd_metadata_df = pd.read_excel(essd_bytes, sheet_name='metadata')
custom_meta_fields = { d['Variable']: {k:v for k,v in d.items() if k!='Variable' and v==v} for d in essd_metadata_df.to_dict('records') }
# ??? custom_meta_fields['gas'] is a kind of Unit, namely a species of gas.  How do we explain that?
custom_meta_fields['value']['Units'] = 'tons' # combined with gas species, we'd get 't ${gas_species}'
custom_meta_fields['gwp100_ar5']['Units'] = 't CO2e/tons'
del(essd_metadata_df)

def description_is (s):
    return { 'Description': s}

custom_meta_fields['ISO'] = description_is("ISO 3166 3-letter code")
custom_meta_fields['country'] = description_is("ISO 3166 country name")
custom_meta_fields['region_ar6_6'] = description_is("IPCC ar6 6-region climate boundaries")
custom_meta_fields['region_ar6_10'] = description_is("IPCC ar6 10-region climate boundaries")
custom_meta_fields['region_ar6_22'] = description_is("IPCC ar6 22-region climate boundaries")
custom_meta_fields['region_ar6_dev'] = description_is("IPCC ar6 developed/ldc country")
custom_meta_fields['sector_title'] = description_is("IPCC ar6 5 Principal Sector Names")
custom_meta_fields['subsector_title'] = description_is("IPCC ar6 Sub-Sector Names")
custom_meta_fields['year'] = description_is("Year of Data Observation")

for f in custom_meta_fields:
    if 'ISO' in custom_meta_fields[f]['Description']:
        custom_meta_fields[f]['tags'] = ['ISO']
    elif 'IPCC' in custom_meta_fields[f]['Description']:
        custom_meta_fields[f]['tags'] = ['IPCC']
    elif f == 'year':
        custom_meta_fields[f]['tags'] = ['annual']

In [17]:
essd_data_df.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 568341 entries, 0 to 568340
Data columns (total 13 columns):
 #   Column           Non-Null Count   Dtype         
---  ------           --------------   -----         
 0   ISO              568341 non-null  string        
 1   country          568341 non-null  string        
 2   region_ar6_6     568341 non-null  string        
 3   region_ar6_10    568341 non-null  string        
 4   region_ar6_22    568341 non-null  string        
 5   region_ar6_dev   568341 non-null  string        
 6   sector_title     568341 non-null  string        
 7   subsector_title  568341 non-null  string        
 8   gas              568341 non-null  string        
 9   gwp100_ar5       568341 non-null  Int32         
 10  value            568341 non-null  Float64       
 11  value_units      568341 non-null  string        
 12  year             568341 non-null  datetime64[ns]
dtypes: Float64(1), Int32(1), datetime64[ns](1), string(10)
memory usage: 55.3 

In [18]:
dbt_dict

{'models': {}}

In [19]:
create_trino_table_and_dbt_metadata('ghg_data', essd_data_df, "array['year']", custom_meta_content, custom_meta_fields)
del(essd_data_df)


create table if not exists osc_datacommons_dev.mdt_sandbox.essd_ghg_data(
    ISO varchar,
    country varchar,
    region_ar6_6 varchar,
    region_ar6_10 varchar,
    region_ar6_22 varchar,
    region_ar6_dev varchar,
    sector_title varchar,
    subsector_title varchar,
    gas varchar,
    gwp100_ar5 integer,
    value double,
    value_units varchar,
    year timestamp(6)
) with (
partitioning = array['year'],
format = 'ORC'
)

[(True,)]


Grab Sector, Region, 100yr GWPs, and GH4_gwps from one of the two main ESSD data tables (they are the same in both).

In [20]:
essd_sectors_df = pd.read_excel(essd_bytes, sheet_name='sector_classification',
                                # nrows=100,
                                engine='openpyxl').convert_dtypes()
display(essd_sectors_df.columns)

Index(['EDGAR_code', 'fossil_bio', 'sector_title', 'description',
       'subsector_title', 'IPCC_2006'],
      dtype='object')

In [21]:
custom_meta_content = {
    'title': 'Earth System Science Data (ESSD) Sector Classification',
    'author': essd_content_df[['Author & contact']].squeeze(),
    'contact': essd_content_df[['Author & contact']].squeeze(),
    'description': essd_content_df[['Sectors']].squeeze(),
    'release_date': essd_content_df[['Last date of compilation']].squeeze(),
    # How should we describe our transformative step here?
}

custom_meta_fields = {
    "EDGAR_code": description_is("EDGAR Sector Coding [1-7][A-G][1-15][xx]"),
    "fossil_bio": description_is("CO2e source - fossil or bio"),
    "sector_title": description_is("IPCC ar6 Sector Names"),
    "description": description_is("IPCC Sector description"),
    "subsector_title": description_is("IPCC ar6 Sub-Sector Names"),
    "IPCC_2006": description_is("IPCC Sector Coding List [1-5].[A-G].[1-9].[a-h].[i-iii].[2-6]"),
}

for f in custom_meta_fields:
    if 'IPCC' in custom_meta_fields[f]['Description']:
        custom_meta_fields[f]['tags'] = ['IPCC']
    elif 'EDGAR' in custom_meta_fields[f]['Description']:
        custom_meta_fields[f]['tags'] = ['EDGAR']

In [22]:
create_trino_table_and_dbt_metadata('sectors', essd_sectors_df, '', custom_meta_content, custom_meta_fields)


create table if not exists osc_datacommons_dev.mdt_sandbox.essd_sectors(
    EDGAR_code varchar,
    fossil_bio varchar,
    sector_title varchar,
    description varchar,
    subsector_title varchar,
    IPCC_2006 varchar
) with (

format = 'ORC'
)

[(True,)]


In [23]:
# dbt_dict['models']

In [24]:
essd_regions_df = pd.read_excel(essd_bytes, sheet_name='region_classification',
                                # nrows=100,
                                engine='openpyxl').convert_dtypes()
display(essd_regions_df.columns)

Index(['ISO', 'name', 'region_ar6_6', 'region_ar6_10', 'region_ar6_22',
       'region_ar6_dev'],
      dtype='object')

In [25]:
custom_meta_content = {
    'title': 'Earth System Science Data (ESSD) Region Classification',
    'author': essd_content_df[['Author & contact']].squeeze(),
    'contact': essd_content_df[['Author & contact']].squeeze(),
    'description': essd_content_df[['Regions']].squeeze(),
    'release_date': essd_content_df[['Last date of compilation']].squeeze(),
    # How should we describe our transformative step here?
}

custom_meta_fields = {
    'ISO': description_is("ISO 3166 3-letter code"),
    'name': description_is("ISO 3166 country name"),
    'region_ar6_6': description_is("IPCC ar6 6-region climate boundaries"),
    'region_ar6_10': description_is("IPCC ar6 10-region climate boundaries"),
    'region_ar6_22': description_is("IPCC ar6 22-region climate boundaries"),
    'region_ar6_dev': description_is("IPCC ar6 developed/ldc country"),
}

for f in custom_meta_fields:
    if 'ISO' in custom_meta_fields[f]['Description']:
        custom_meta_fields[f]['tags'] = ['ISO']
    elif 'IPCC' in custom_meta_fields[f]['Description']:
        custom_meta_fields[f]['tags'] = ['IPCC']

In [26]:
create_trino_table_and_dbt_metadata('regions', essd_regions_df, '', custom_meta_content, custom_meta_fields)


create table if not exists osc_datacommons_dev.mdt_sandbox.essd_regions(
    ISO varchar,
    name varchar,
    region_ar6_6 varchar,
    region_ar6_10 varchar,
    region_ar6_22 varchar,
    region_ar6_dev varchar
) with (

format = 'ORC'
)

[(True,)]


In [27]:
# dbt_dict['models']

Now deal with the gas species.  We'll annotate our dataframe with PINT units

In [28]:
essd_gwp_df = pd.read_excel(essd_bytes, sheet_name='100_yr_gwps',
                            # nrows=100,
                            dtype={'gwp_ar5':'int32'}, engine='openpyxl')

display(essd_gwp_df.columns)

Index(['gas', 'gwp_ar5'], dtype='object')

In [29]:
ureg.define("CH4_Combustion = nan CH4")
ureg.define("CH4_Fugitive = nan CH4")
ureg.define("CH4_Process = nan CH4")

ch4_dict = {
    'CH4 Biogenic': 'CH4',
    'CH4 Fossil (Combustion)': 'CH4_Combustion',
    'CH4 Fossil (Fugitive)': 'CH4_Fugitive',
    'CH4 Fossil (Process)': 'CH4_Process'
}

def convert_gas_to_pint_species(s):
    """For a series S of gas species, return the species name known to pint"""
    return [ch4_dict[g] if g in ch4_dict else g.replace('c-', 'C').replace('-','') for g in s.tolist()]

essd_gwp_df = essd_gwp_df.assign(units=lambda x: convert_gas_to_pint_species(x.gas)).convert_dtypes()

display(essd_gwp_df.columns)

Index(['gas', 'gwp_ar5', 'units'], dtype='object')

In [30]:
custom_meta_content = {
    'title': 'Earth System Science Data (ESSD) Gas Species',
    'author': essd_content_df[['Author & contact']].squeeze(),
    'contact': essd_content_df[['Author & contact']].squeeze(),
    'description': essd_content_df[['Regions']].squeeze(),
    'release_date': essd_content_df[['Last date of compilation']].squeeze(),
    # How should we describe our transformative step here?
}

custom_meta_fields = {
    'gas': description_is("GHG Gas Species"),
    'gwp_ar5': description_is("IPCC AR5 Global Warming Potential multiplier (GWP)"),
    'units': description_is("tons of GHG Gas Species"),

}

for f in custom_meta_fields:
    if 'GHG' in custom_meta_fields[f]['Description']:
        custom_meta_fields[f]['tags'] = ['GHG']
    elif 'IPCC' in custom_meta_fields[f]['Description']:
        custom_meta_fields[f]['tags'] = ['IPCC']

In [31]:
create_trino_table_and_dbt_metadata('gwp_100yr', essd_gwp_df, '', custom_meta_content, custom_meta_fields)


create table if not exists osc_datacommons_dev.mdt_sandbox.essd_gwp_100yr(
    gas varchar,
    gwp_ar5 integer,
    units varchar
) with (

format = 'ORC'
)

[(True,)]


In [32]:
# dbt_dict['models']

In [33]:
essd_ch4_df = pd.read_excel(essd_bytes, sheet_name='CH4_gwps',
                            # nrows=100,
                            dtype={'gwp_ar5':'int32', 'subsector':'str'}, engine='openpyxl').convert_dtypes()

display(essd_ch4_df.columns)

Index(['sector_code', 'fossil_bio', 'gas', 'gwp_ar5_feedbacks', 'gwp_ar5',
       'description', 'subsector', 'chapter_title', 'subsector_title'],
      dtype='object')

In [34]:
custom_meta_content = {
    'title': 'Earth System Science Data (ESSD) CH4 GWPs',
    'author': essd_content_df[['Author & contact']].squeeze(),
    'contact': essd_content_df[['Author & contact']].squeeze(),
    'description': essd_content_df[['Regions']].squeeze(),
    'release_date': essd_content_df[['Last date of compilation']].squeeze(),
    # How should we describe our transformative step here?
}

custom_meta_fields = {
    "sector_code": description_is("IPCC Sector Coding [1-7][A-G][1-15][xx]"),
    "fossil_bio": description_is("CO2e source - fossil or bio"),
    'gas': description_is("GHG Gas Species"),
    'gwp_ar5_feedbacks': description_is("IPCC AR5 Global Warming Potential Feedback multiplier (GWP)"),
    'gwp_ar5': description_is("IPCC AR5 Global Warming Potential multiplier (GWP)"),
    "description": description_is("IPCC Sector description"),
    "subsector": description_is("IPCC Sub-Sector description"),
    "chapter_title": description_is("IPCC ar6 Sector Names"),
    "subsector_title": description_is("IPCC ar6 Sub-Sector Names"),
}

for f in custom_meta_fields:
    if 'IPCC' in custom_meta_fields[f]['Description']:
        custom_meta_fields[f]['tags'] = ['IPCC']
    elif 'GHG' in custom_meta_fields[f]['Description']:
        custom_meta_fields[f]['tags'] = ['GHG']

In [35]:
create_trino_table_and_dbt_metadata('ch4_gwp', essd_ch4_df, '', custom_meta_content, custom_meta_fields)


create table if not exists osc_datacommons_dev.mdt_sandbox.essd_ch4_gwp(
    sector_code varchar,
    fossil_bio varchar,
    gas varchar,
    gwp_ar5_feedbacks bigint,
    gwp_ar5 integer,
    description varchar,
    subsector varchar,
    chapter_title varchar,
    subsector_title varchar
) with (

format = 'ORC'
)

[(True,)]


In [36]:
# dbt_dict['models']

In [37]:
# Use the pachyderm client to check out the source XLSX file
xlsx_file_name = "essd_ghg_data_gwp100.xlsx"
f = client.get_file(("essd", "master"), "/ESSD/" + xlsx_file_name)
essd_bytes = io.BytesIO(f.read())

In [38]:
# This takes about 30sec to execute
df = pd.read_excel(essd_bytes, sheet_name='data',
                   # nrows=100,
                   converters={'year': lambda x: pd.to_datetime(x, format='%Y')},
                   dtype={'gwp100_ar5':'int32',
                          'CO2':'float64','CH4':'float64','N2O':'float64','Fgas':'float64','GHG':'float64'},
                   engine='openpyxl')
for col in ['CO2','CH4','N2O','Fgas','GHG']:
    df.insert(df.columns.get_loc(col)+1, col + "_units", "t CO2" if col=='CO2' else "t CO2e")

In [39]:
# Put year at the end to make for more friendly partitioning
essd_gwp100_df = osc.enforce_partition_column_order(df, ['year'], inplace=False).convert_dtypes()

display(essd_gwp100_df.columns)

Index(['ISO', 'country', 'region_ar6_6', 'region_ar6_10', 'region_ar6_22',
       'region_ar6_dev', 'sector_title', 'subsector_title', 'CO2', 'CO2_units',
       'CH4', 'CH4_units', 'N2O', 'N2O_units', 'Fgas', 'Fgas_units', 'GHG',
       'GHG_units', 'year'],
      dtype='object')

In [40]:
# Generate profile for the data set and render it as an HTML report
# Note that minimal=True is required due to incompatibility between pandas 1.4.x and pandas-profiling
# This disables expensive computations such as correlations and duplicate row detection
# Refer to GitHub issue at https://github.com/ydataai/pandas-profiling/issues/911 for information

profile = ProfileReport(essd_gwp100_df, title="Profiling Report for " + xlsx_file_name, minimal=True)
profile.to_file("../reports/profile_df_essd_ghg_data_gwp100.html")

# The code below allows generating the profile into a HTML version and display it within the notebook
# profile.to_notebook_iframe()

# The code below allows generating the profile into an interactive widget (not working)
# profile.to_widgets()

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

In [41]:
essd_gwp100_content_df = pd.read_excel(essd_bytes, sheet_name='info', header=None).dropna(axis=0).set_index(0).T
custom_meta_content = {
    'title': 'Earth System Science Data (ESSD) GWP100 Dataset',
    'author': essd_content_df[['Author & contact']].squeeze(),
    'contact': essd_content_df[['Author & contact']].squeeze(),
    'description': essd_content_df[['Data description']].squeeze(),
    'release_date': essd_content_df[['Last date of compilation']].squeeze(),
    # How should we describe our transformative step here?
}

In [42]:
essd_gwp100_metadata_df = pd.read_excel(essd_bytes, sheet_name='metadata')
custom_meta_fields = { d['Variable']: {k:v for k,v in d.items() if k!='Variable' and v==v} for d in essd_gwp100_metadata_df.to_dict('records') }
del(essd_gwp100_metadata_df)

custom_meta_fields['ISO'] = description_is("ISO 3166 3-letter code")
custom_meta_fields['country'] = description_is("ISO 3166 country name")
custom_meta_fields['region_ar6_6'] = description_is("IPCC ar6 6-region climate boundaries")
custom_meta_fields['region_ar6_10'] = description_is("IPCC ar6 10-region climate boundaries")
custom_meta_fields['region_ar6_22'] = description_is("IPCC ar6 22-region climate boundaries")
custom_meta_fields['region_ar6_dev'] = description_is("IPCC ar6 developed/ldc country")
custom_meta_fields['sector_title'] = description_is("IPCC ar6 5 Principal Sector Names")
custom_meta_fields['subsector_title'] = description_is("IPCC ar6 Sub-Sector Names")
custom_meta_fields['CO2'] = description_is("tons of CO2")
custom_meta_fields['CO2_units'] = description_is("pint[t CO2]")
custom_meta_fields['CH4'] = description_is("tons of CH4")
custom_meta_fields['CH4_units'] = description_is("pint[t CH4]")
custom_meta_fields['N2O'] = description_is("tons of N2O")
custom_meta_fields['N2O_units'] = description_is("pint[t N2O]")
custom_meta_fields['Fgas'] = description_is("tons of Fgas")
custom_meta_fields['Fgas_units'] = description_is("pint[t Fgas]")
custom_meta_fields['year'] = description_is("Year of Data Observation")

for f in custom_meta_fields:
    if 'ISO' in custom_meta_fields[f]['Description']:
        custom_meta_fields[f]['tags'] = ['ISO']
    elif 'IPCC' in custom_meta_fields[f]['Description']:
        custom_meta_fields[f]['tags'] = ['IPCC']
    elif f == 'year':
        custom_meta_fields[f]['tags'] = ['annual']

Construct the combined metadata by merging existing table metadata and custom metadata.
Note: The metadata content must be JSON serialisable and encoded as bytes; the metadata key must also be encoded as bytes.

In [43]:
create_trino_table_and_dbt_metadata('gwp100_data', essd_gwp100_df, "array['year']", custom_meta_content, custom_meta_fields)
del(essd_gwp100_df)


create table if not exists osc_datacommons_dev.mdt_sandbox.essd_gwp100_data(
    ISO varchar,
    country varchar,
    region_ar6_6 varchar,
    region_ar6_10 varchar,
    region_ar6_22 varchar,
    region_ar6_dev varchar,
    sector_title varchar,
    subsector_title varchar,
    CO2 double,
    CO2_units varchar,
    CH4 double,
    CH4_units varchar,
    N2O double,
    N2O_units varchar,
    Fgas double,
    Fgas_units varchar,
    GHG double,
    GHG_units varchar,
    year timestamp(6)
) with (
partitioning = array['year'],
format = 'ORC'
)

[(True,)]


In [44]:
# dbt_dict['models']

In [45]:
# Use the pachyderm client to check out the source XLSX file
xlsx_file_name = "essd_lulucf_data.xlsx"
f = client.get_file(("essd", "master"), "/ESSD/" + xlsx_file_name)
essd_bytes = io.BytesIO(f.read())

In [46]:
df = pd.read_excel(essd_bytes, sheet_name='data',
                   # nrows=100,
                   converters={'year': lambda x: pd.to_datetime(x, format='%Y')}, engine='openpyxl')

for col in ['blue', 'houghton', 'oscar', 'mean']:
    df.insert(df.columns.get_loc(col)+1, col + "_units", "t CO2e")

# Put year at the end to make for more friendly partitioning
lulucf_df = osc.enforce_partition_column_order(df, ['year'], inplace=False).convert_dtypes()

display(lulucf_df.columns)

Index(['region_ar6_6', 'region_ar6_10', 'blue', 'blue_units', 'houghton',
       'houghton_units', 'oscar', 'oscar_units', 'mean', 'mean_units', 'year'],
      dtype='object')

In [47]:
# Generate profile for the data set and render it as an HTML report
# Note that minimal=True is required due to incompatibility between pandas 1.4.x and pandas-profiling
# This disables expensive computations such as correlations and duplicate row detection
# Refer to GitHub issue at https://github.com/ydataai/pandas-profiling/issues/911 for information

profile = ProfileReport(lulucf_df, title="Profiling Report for " + xlsx_file_name, minimal=True)
profile.to_file("../reports/profile_df_essd_lulucf.html")

# The code below allows generating the profile into a HTML version and display it within the notebook
# profile.to_notebook_iframe()

# The code below allows generating the profile into an interactive widget (not working)
# profile.to_widgets()

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

In [48]:
lulucf_content_df = pd.read_excel(essd_bytes, sheet_name='info', header=None).dropna(axis=0).set_index(0).T
custom_meta_content = {
    'title': 'Earth System Science Data (ESSD) Land Use Change and Forestry Dataset',
    'author': lulucf_content_df[['Author']].squeeze(),
    'contact': lulucf_content_df[['Contact']].squeeze(),
    'description': lulucf_content_df[['Info']].squeeze(),
    'release_date': essd_content_df[['Last date of compilation']].squeeze(),
    # How should we describe our transformative step here?
}

In [49]:
custom_meta_fields = {
    'region_ar6_6': description_is("IPCC ar6 6-region climate boundaries"),
    'region_ar6_10': description_is("IPCC ar6 10-region climate boundaries"),
}

custom_meta_fields['blue'] = description_is("land use bookkeeping model BLUE")
custom_meta_fields['houghton'] = description_is("land use bookkeeping model of Houghton & Nassikas")
custom_meta_fields['oscar'] = description_is("land use bookkeeping model OSCAR")
custom_meta_fields['mean'] = description_is("land use bookkeeping model mean of (BLUE, Houghton & Nassikas, OSCAR)")
for k in ['blue', 'houghton', 'oscar', 'mean']:
    custom_meta_fields[f"{k}_units"] = description_is("pint[t CO2]")
custom_meta_fields['year'] = description_is("Year of Data Observation")

for f in custom_meta_fields:
    if 'IPCC' in custom_meta_fields[f]['Description']:
        custom_meta_fields[f]['tags'] = ['IPCC']
    elif f == 'year':
        custom_meta_fields[f]['tags'] = ['annual']

In [50]:
create_trino_table_and_dbt_metadata('lulucf', lulucf_df, "array['year']", custom_meta_content, custom_meta_fields)
del(lulucf_df)


create table if not exists osc_datacommons_dev.mdt_sandbox.essd_lulucf(
    region_ar6_6 varchar,
    region_ar6_10 varchar,
    blue double,
    blue_units varchar,
    houghton double,
    houghton_units varchar,
    oscar double,
    oscar_units varchar,
    mean double,
    mean_units varchar,
    year timestamp(6)
) with (
partitioning = array['year'],
format = 'ORC'
)

[(True,)]


In [51]:
# dbt_dict['models']

Write out metadata to DBT file

In [52]:
dbt_yml = open("/opt/app-root/src/essd-ingest-pipeline/dbt/essd_transform/models/essd_schema.yml", "w", encoding="utf-8")

In [53]:
print("version: 2", file=dbt_yml)

indent = 0
print("\nmodels:", file=dbt_yml)
indent = indent + 2
for name in dbt_dict['models']:
    model = dbt_dict['models'][name]
    print(f"{' '*indent}- name: {name}", file=dbt_yml)
    indent = indent + 2
    print(f"{' '*indent}description: {model['description']}", file=dbt_yml)
    print(f"\n{' '*indent}columns:", file=dbt_yml)
    indent = indent + 2
    columns = model['columns']
    for col in columns:
        print(f"{' '*indent}- name: {col}", file=dbt_yml)
        indent = indent + 2
        for col_meta in columns[col].keys():
            print(f"{' '*indent}{col_meta}: {columns[col][col_meta]}", file=dbt_yml)
        indent = indent - 2
    print("", file=dbt_yml) # newline comes for free...
    indent = indent - 4
indent = indent - 2
assert(indent==0)

In [54]:
dbt_yml.close()