## Load WRI Power Plant data from 2019 dataset (see https://datasets.wri.org/dataset/globalpowerplantdatabase) for original source

Copyright (C) 2021 OS-Climate

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

### We have a local copy rooted in the S3_BUCKET : WRI/global_power_plant_database_v_1_3/global_power_plant_database.csv
### To tidy the data we factor into three tables:

* **wri_plants** (all the fixed data about each plant)
* **wri_annual_gwh** (per plant/per year annual generation in GWh)
* **wri_estimated_gwh** (per plant/per year estimated generation in GWh)

### The next step is to enrich with OS-C Factor metadata

Contributed by Michael Tiemann (Github: MichaelTiemannOSC)

Load Credentials

In [1]:
# From the AWS Account page, copy the export scripts from the appropriate role using the "Command Line or Programmatic Access" link
# Paste the copied text into ~/credentials.env

from dotenv import dotenv_values, load_dotenv
import os
import pathlib

dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src'))
dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env'
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path,override=True)

Create an S3 resource for the bucket holding source data

In [2]:
import boto3
s3_resource = boto3.resource(
    service_name="s3",
    endpoint_url=os.environ['S3_LANDING_ENDPOINT'],
    aws_access_key_id=os.environ['S3_LANDING_ACCESS_KEY'],
    aws_secret_access_key=os.environ['S3_LANDING_SECRET_KEY'],
)
bucket = s3_resource.Bucket(os.environ['S3_LANDING_BUCKET'])

wri_file = bucket.Object('WRI/global_power_plant_database_v_1_3/global_power_plant_database.csv').get()['Body']

Load WRI data file using pandas *read_csv*

In [3]:
import pandas as pd
import numpy as np

# Because NaN cannot be converted to int type, we cannot use int as a datatype for years that are NaN

wri_df = pd.read_csv(wri_file, dtype={'latitude':'float64', 'longitude':'float64', 'capacity_mw':'float64', 'other_fuel3':'str'})

display(wri_df.columns)

Index(['country', 'country_long', 'name', 'gppd_idnr', 'capacity_mw',
       'latitude', 'longitude', 'primary_fuel', 'other_fuel1', 'other_fuel2',
       'other_fuel3', 'commissioning_year', 'owner', 'source', 'url',
       'geolocation_source', 'wepp_id', 'year_of_capacity_data',
       'generation_gwh_2013', 'generation_gwh_2014', 'generation_gwh_2015',
       'generation_gwh_2016', 'generation_gwh_2017', 'generation_gwh_2018',
       'generation_gwh_2019', 'generation_data_source',
       'estimated_generation_gwh_2013', 'estimated_generation_gwh_2014',
       'estimated_generation_gwh_2015', 'estimated_generation_gwh_2016',
       'estimated_generation_gwh_2017', 'estimated_generation_note_2013',
       'estimated_generation_note_2014', 'estimated_generation_note_2015',
       'estimated_generation_note_2016', 'estimated_generation_note_2017'],
      dtype='object')

### Melt the generation data into a more tidy format, dropping NA values

In [4]:
wri_plants = wri_df[['country', 'country_long', 'name', 'gppd_idnr', 'capacity_mw',
       'latitude', 'longitude', 'primary_fuel', 'other_fuel1', 'other_fuel2',
       'other_fuel3', 'commissioning_year', 'owner', 'source', 'url',
       'geolocation_source', 'wepp_id', 'year_of_capacity_data', 'generation_data_source']]

wri_id_vars = ['gppd_idnr']
wri_value_vars = ['generation_gwh_2013', 'generation_gwh_2014', 'generation_gwh_2015', 'generation_gwh_2016',
                  'generation_gwh_2017', 'generation_gwh_2018', 'generation_gwh_2019']
wri_annual_gwh = wri_df.melt(wri_id_vars, wri_value_vars, var_name='year', value_name='generation_gwh')
wri_annual_gwh['year'] = pd.to_numeric(wri_annual_gwh['year'].apply(lambda x: int(x.split('_')[-1])))
wri_annual_gwh.dropna(subset=['generation_gwh'],inplace=True)

display(wri_annual_gwh)

Unnamed: 0,gppd_idnr,year,generation_gwh
337,AUS0000065,2013,89.595278
340,AUS0000114,2013,1095.676944
342,AUS0000264,2013,204.804444
345,AUS0000081,2013,132.456667
346,AUS0000113,2013,4.194444
...,...,...,...
244154,USA0056871,2019,22.647000
244155,USA0001368,2019,-0.045000
244156,USA0057648,2019,1.211000
244157,USA0061574,2019,1.589000


In [5]:
wri_id_vars = ['gppd_idnr']
wri_value_vars = ['estimated_generation_gwh_2013', 'estimated_generation_gwh_2014', 'estimated_generation_gwh_2015',
                  'estimated_generation_gwh_2016', 'estimated_generation_gwh_2017']
wri_estimated_gwh = wri_df.melt(wri_id_vars, wri_value_vars, var_name='year', value_name='estimated_generation_gwh')
wri_estimated_gwh['year'] = pd.to_numeric(wri_estimated_gwh['year'].apply(lambda x: int(x.split('_')[-1])))

wri_value_vars = ['estimated_generation_note_2013', 'estimated_generation_note_2014', 'estimated_generation_note_2015',
                  'estimated_generation_note_2016', 'estimated_generation_note_2017']
wri_estimated_note = wri_df.melt(wri_id_vars, wri_value_vars, var_name='year', value_name='estimated_generation_note')
wri_estimated_note['year'] = pd.to_numeric(wri_estimated_note['year'].apply(lambda x: int(x.split('_')[-1])))

display(wri_estimated_gwh)

Unnamed: 0,gppd_idnr,year,estimated_generation_gwh
0,GEODB0040538,2013,123.77
1,WKS0070144,2013,18.43
2,WKS0071196,2013,18.64
3,GEODB0040541,2013,225.06
4,GEODB0040534,2013,406.16
...,...,...,...
174675,WRI1022386,2017,183.79
174676,WRI1022384,2017,73.51
174677,WRI1022380,2017,578.32
174678,GEODB0040404,2017,2785.10


Merge the estimation data so that estimates and notes are 1:1 together, dropping NA values

In [6]:
wri_estimated_gwh = wri_estimated_gwh.merge(wri_estimated_note, on=['gppd_idnr', 'year'], validate="one_to_one")

wri_estimated_gwh.dropna(subset=['estimated_generation_gwh'],inplace=True)

display(wri_estimated_gwh)

Unnamed: 0,gppd_idnr,year,estimated_generation_gwh,estimated_generation_note
0,GEODB0040538,2013,123.77,HYDRO-V1
1,WKS0070144,2013,18.43,SOLAR-V1-NO-AGE
2,WKS0071196,2013,18.64,SOLAR-V1-NO-AGE
3,GEODB0040541,2013,225.06,HYDRO-V1
4,GEODB0040534,2013,406.16,HYDRO-V1
...,...,...,...,...
174675,WRI1022386,2017,183.79,CAPACITY-FACTOR-V1
174676,WRI1022384,2017,73.51,CAPACITY-FACTOR-V1
174677,WRI1022380,2017,578.32,HYDRO-V1
174678,GEODB0040404,2017,2785.10,CAPACITY-FACTOR-V1


In [7]:
display(wri_plants)

Unnamed: 0,country,country_long,name,gppd_idnr,capacity_mw,latitude,longitude,primary_fuel,other_fuel1,other_fuel2,other_fuel3,commissioning_year,owner,source,url,geolocation_source,wepp_id,year_of_capacity_data,generation_data_source
0,AFG,Afghanistan,Kajaki Hydroelectric Power Plant Afghanistan,GEODB0040538,33.0,32.3220,65.1190,Hydro,,,,,,GEODB,http://globalenergyobservatory.org,GEODB,1009793,2017.0,
1,AFG,Afghanistan,Kandahar DOG,WKS0070144,10.0,31.6700,65.7950,Solar,,,,,,Wiki-Solar,https://www.wiki-solar.org,Wiki-Solar,,,
2,AFG,Afghanistan,Kandahar JOL,WKS0071196,10.0,31.6230,65.7920,Solar,,,,,,Wiki-Solar,https://www.wiki-solar.org,Wiki-Solar,,,
3,AFG,Afghanistan,Mahipar Hydroelectric Power Plant Afghanistan,GEODB0040541,66.0,34.5560,69.4787,Hydro,,,,,,GEODB,http://globalenergyobservatory.org,GEODB,1009795,2017.0,
4,AFG,Afghanistan,Naghlu Dam Hydroelectric Power Plant Afghanistan,GEODB0040534,100.0,34.6410,69.7170,Hydro,,,,,,GEODB,http://globalenergyobservatory.org,GEODB,1009797,2017.0,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
34931,ZMB,Zambia,Ndola,WRI1022386,50.0,-12.9667,28.6333,Oil,,,,,ZESCO,Energy Regulation Board of Zambia,http://www.erb.org.zm/reports/EnergySectorRepo...,Power Africa,1089529,,
34932,ZMB,Zambia,Nkana,WRI1022384,20.0,-12.8167,28.2000,Oil,,,,,ZESCO,Energy Regulation Board of Zambia,http://www.erb.org.zm/reports/EnergySectorRepo...,Power Africa,1043097,,
34933,ZMB,Zambia,Victoria Falls,WRI1022380,108.0,-17.9167,25.8500,Hydro,,,,,ZESCO,Energy Regulation Board of Zambia,http://www.erb.org.zm/reports/EnergySectorRepo...,Power Africa,1033763,,
34934,ZWE,Zimbabwe,Hwange Coal Power Plant Zimbabwe,GEODB0040404,920.0,-18.3835,26.4700,Coal,,,,,,GEODB,http://globalenergyobservatory.org,GEODB,1033856,2017.0,


Ingest the data onto the S3 Development Bucket

In [8]:
schemaname = 'wri_gppd'
tablename_to_df = {'plants': wri_plants, 'annual_gwh': wri_annual_gwh, 'estimated_gwh': wri_estimated_gwh}

# Create an S3 client
s3 = boto3.client(
    service_name="s3",
    endpoint_url=os.environ['S3_DEV_ENDPOINT'],
    aws_access_key_id=os.environ['S3_DEV_ACCESS_KEY'],
    aws_secret_access_key=os.environ['S3_DEV_SECRET_KEY'],
)

for tablename, df in tablename_to_df.items():
    df.to_parquet('/tmp/{sname}.{tname}.parquet'.format(sname=schemaname, tname=tablename), index=False)

    s3.upload_file(
        Bucket=os.environ['S3_DEV_BUCKET'],
        Key='trino/{sname}/{tname}/{tname}.parquet'.format(sname=schemaname, tname=tablename),
        Filename='/tmp/{sname}.{tname}.parquet'.format(sname=schemaname, tname=tablename)
    )

Build a map and define schema mapping logic for parquet to sql

In [9]:
import re
import pandas as pd
_wsdedup = re.compile(r"\s+")
_usdedup = re.compile(r"__+")
_rmpunc = re.compile(r"[,.()&$/+-]+")
# 63 seems to be a common max column name length
def snakify(name, maxlen=63):
    if isinstance(name, list):
        return [snakify(e) for e in name]
    w = str(name).casefold().rstrip().lstrip()
    w = w.replace("-", "_")
    w = _rmpunc.sub("", w)
    w = _wsdedup.sub("_", w)
    w = _usdedup.sub("_", w)
    w = w.replace("average", "avg")
    w = w.replace("maximum", "max")
    w = w.replace("minimum", "min")
    w = w.replace("absolute", "abs")
    w = w.replace("source", "src")
    w = w.replace("distribution", "dist")
    # these are common in the sample names but unsure of standard abbv
    #w = w.replace("inference", "inf")
    #w = w.replace("emissions", "emis")
    #w = w.replace("intensity", "int")
    #w = w.replace("reported", "rep")
    #w = w.replace("revenue", "rev")
    w = w[:maxlen] 
    return w

def snakify_columns(df, inplace=False, maxlen=63):
    icols = df.columns.to_list()
    ocols = snakify(icols, maxlen=maxlen)
    scols = set(ocols)
    if (len(set(ocols)) < len(ocols)):
        raise ValueError("remapped column names were not unique!")
    rename_map = dict(list(zip(icols,snakify(icols))))
    return df.rename(columns=rename_map, inplace=inplace)

rename_year_columns={}
for y in range(1900,2100):
    rename_year_columns[str(y)] = 'y{yr}'.format(yr=y)
#rename_year_columns

_p2smap = {
    'object': 'varchar',
    'string': 'varchar',
    'str': 'varchar',
    'float32': 'real',
    'Float32': 'real',
    'float64': 'double',
    'Float64': 'double',
    'int32': 'integer',
    'Int32': 'integer',
    'int64': 'bigint',
    'Int64': 'bigint',
    'category': 'varchar',
    'datetime64[ns, UTC]': 'timestamp',
    'datetime64[ns]': 'timestamp'
}

def pandas_type_to_sql(pt):
    st = _p2smap.get(pt)
    if st is not None:
        return st
    raise ValueError("unexpected pandas column type '{pt}'".format(pt=pt))

# add ability to specify optional dict for specific fields?
# if column name is present, use specified value?
def generate_table_schema_pairs(df):
    ptypes = [str(e) for e in df.dtypes.to_list()]
    stypes = [pandas_type_to_sql(e) for e in ptypes]
    pz = list(zip(df.columns.to_list(), stypes))
    return ",\n".join(["    {n} {t}".format(n=e[0],t=e[1]) for e in pz])

In [10]:
import trino

conn = trino.dbapi.connect(
    host=os.environ['TRINO_HOST'],
    port=int(os.environ['TRINO_PORT']),
    user=os.environ['TRINO_USER'],
    http_scheme='https',
    auth=trino.auth.JWTAuthentication(os.environ['TRINO_PASSWD']),
    verify=True,
)
cur = conn.cursor()

In [11]:
# Show available schemas to ensure trino connection is set correctly
cur.execute('show schemas in osc_datacommons_dev')
cur.fetchall()

[['_metadata'],
 ['aicoe_osc_demo'],
 ['climate_itr_tool_project'],
 ['company_data'],
 ['default'],
 ['defaultschema1'],
 ['demo'],
 ['eje'],
 ['epacems'],
 ['epacems_y95_al'],
 ['information_schema'],
 ['metastore'],
 ['os_climate_corporate_data_project'],
 ['osc_corp_data'],
 ['pudl'],
 ['rmi'],
 ['rmi_utility_transition_hub'],
 ['team1'],
 ['team2'],
 ['testaccessschema1'],
 ['urgentem'],
 ['wri'],
 ['wri_gppd']]

In [12]:
cur.execute('create schema if not exists osc_datacommons_dev.' + schemaname)
cur.fetchall()

[[True]]

In [13]:
cur.execute('.'.join(['drop table if exists osc_datacommons_dev', schemaname, tablename]))
cur.fetchall()

[[True]]

In [14]:
for tablename, df in tablename_to_df.items():
    schema = generate_table_schema_pairs(df)

    tabledef = """create table if not exists osc_datacommons_dev.{sname}.{tname}(
{schema}
) with (
    format = 'parquet',
    external_location = 's3a://{bucket}/trino/{sname}/{tname}/'
)""".format(schema=schema,bucket=os.environ['S3_DEV_BUCKET'],sname=schemaname,tname=tablename)
    print(tabledef)
    
    # tables created externally may not show up immediately in cloud-beaver
    cur.execute(tabledef)
    cur.fetchall()

create table if not exists osc_datacommons_dev.wri_gppd.plants(
    country varchar,
    country_long varchar,
    name varchar,
    gppd_idnr varchar,
    capacity_mw double,
    latitude double,
    longitude double,
    primary_fuel varchar,
    other_fuel1 varchar,
    other_fuel2 varchar,
    other_fuel3 varchar,
    commissioning_year double,
    owner varchar,
    source varchar,
    url varchar,
    geolocation_source varchar,
    wepp_id varchar,
    year_of_capacity_data double,
    generation_data_source varchar
) with (
    format = 'parquet',
    external_location = 's3a://ocp-odh-os-demo-s3/trino/wri_gppd/plants/'
)
create table if not exists osc_datacommons_dev.wri_gppd.annual_gwh(
    gppd_idnr varchar,
    year bigint,
    generation_gwh double
) with (
    format = 'parquet',
    external_location = 's3a://ocp-odh-os-demo-s3/trino/wri_gppd/annual_gwh/'
)
create table if not exists osc_datacommons_dev.wri_gppd.estimated_gwh(
    gppd_idnr varchar,
    year bigint,
    

## Load metadata following an ingestion process into trino metadata store

### The schema is *metastore*, and the table names are *meta_schema*, *meta_table*, *meta_field*

In [15]:
metastore = 'metastore'

# Create a metadata schema with tables for the three layers of metadata: schema, table, and field.

meta_schema = 'meta_schema'
meta_table = 'meta_table'
meta_field = 'meta_field'

# These metadata tables are local to this ingestion process.
# We will insert/merge with master metadata tables later

metadata_to_df = {
    # For each data source there is a single entry in the _schema_table
    meta_schema: pd.DataFrame(data=[], columns=[]),
    # For each data source there are one or more tables in the _tables_table
    meta_table: pd.DataFrame(data=[],
                    columns=['tname', 'parent_schema', 'source', 'processing_pipeline']),
    # For each table there are one or more fields in the fields_table
    meta_field: pd.DataFrame(data=[],
                    columns=['fname', 'parent_table', 'type', 'dimension', 'description'])
}

cur.execute('create schema if not exists osc_datacommons_dev.' + metastore)
cur.fetchall()

[[True]]

Create the actual metadata for the source.  In this case, it is WRI_GPPD

The quoted text comes from the README.txt file that comes with the dataset.

In [16]:
_mds_df = metadata_to_df[meta_schema]
metadata_text = """Title: Global Power Plant Database
Description: A comprehensive, global, open source database of power plants
Version: 1.3.0
Release Date: 2021-06-02
URI: http://datasets.wri.org/dataset/globalpowerplantdatabase
Copyright: Copyright 2018-2021 World Resources Institute and Data Contributors
License: Creative Commons Attribution 4.0 International -- CC BY 4.0
Contact: powerexplorer@wri.org
Citation: Global Energy Observatory, Google, KTH Royal Institute of Technology in Stockholm, Enipedia, World Resources Institute. 2019. Global Power Plant Database. Published on Resource Watch and Google Earth Engine. http://resourcewatch.org/ https://earthengine.google.com/  """

for line in metadata_text.split('\n'):
    k, v = line.split(':', 1)
    k = snakify(k)
    _mds_df[k] = v

_mds_df['abstract'] = """An affordable, reliable, and environmentally sustainable power sector is central to modern society.
Governments, utilities, and companies make decisions that both affect and depend on the power sector.
For example, if governments apply a carbon price to electricity generation, it changes how plants run and which plants are built over time.
On the other hand, each new plant affects the electricity generation mix, the reliability of the system, and system emissions.
Plants also have significant impact on climate change, through carbon dioxide (CO2) emissions; on water stress, through water withdrawal and consumption; and on air quality, through sulfur oxides (SOx), nitrogen oxides (NOx), and particulate matter (PM) emissions.

The Global Power Plant Database is an open-source open-access dataset of grid-scale (1 MW and greater) electricity generating facilities operating across the world.

The Database currently contains nearly 35000 power plants in 167 countries, representing about 72% of the world's capacity.
Entries are at the facility level only, generally defined as a single transmission grid connection point.
Generation unit-level information is not currently available. 
The methodology for the dataset creation is given in the World Resources Institute publication "A Global Database of Power Plants" [0].
Associated code for the creation of the dataset can be found on GitHub [1].
See also the technical note published in early 2020 on an improved methodology to estimate annual generation [2].

To stay updated with news about the project and future database releases, please sign up for our newsletter for the release announcement [3].


[0] www.wri.org/publication/global-power-plant-database
[1] https://github.com/wri/global-power-plant-database
[2] https://www.wri.org/publication/estimating-power-plant-generation-global-power-plant-database
[3] https://goo.gl/ivTvkd"""
_mds_df['name'] = 'WRI_GPPD'

metadata_to_df[meta_schema] = _mds_df

The WRI_GPPD is ingested into three tables contained in *tablename_to_df*

In [17]:
_mdt_df = metadata_to_df[meta_table]
_mdt_df['tname'] = pd.Series(tablename_to_df.keys())
_mdt_df['parent_schema'] = pd.Series([ meta_schema ] * len(_mdt_df['tname']))

metadata_to_df[meta_table] = _mdt_df

Create the metadata for all the fields in all the tables

In [18]:
field_text = """`country` (text): 3 character country code corresponding to the ISO 3166-1 alpha-3 specification [https://www.iso.org/iso-3166-country-codes.html]
`country_long` (text): longer form of the country designation
`name` (text): name or title of the power plant, generally in Romanized form
`gppd_idnr` (text): 10 or 12 character identifier for the power plant
`capacity_mw` (number): electrical generating capacity in megawatts
`latitude` (number): geolocation in decimal degrees; WGS84 (EPSG:4326)
`longitude` (number): geolocation in decimal degrees; WGS84 (EPSG:4326)
`primary_fuel` (text): energy source used in primary electricity generation or export
`other_fuel1` (text): energy source used in electricity generation or export
`other_fuel2` (text): energy source used in electricity generation or export
`other_fuel3` (text): energy source used in electricity generation or export
`commissioning_year` (number): year of plant operation, weighted by unit-capacity when data is available
`owner` (text): majority shareholder of the power plant, generally in Romanized form
`source` (text): entity reporting the data; could be an organization, report, or document, generally in Romanized form
`url` (text): web document corresponding to the `source` field
`geolocation_source` (text): attribution for geolocation information
`wepp_id` (text): a reference to a unique plant identifier in the widely-used PLATTS-WEPP database.
`year_of_capacity_data` (number): year the capacity information was reported
`generation_data_source` (text): attribution for the reported generation information"""

field_descs = [line.split(': ')[1] for line in field_text.split('\n')]

In [19]:
_mdf_df = metadata_to_df[meta_field]

for tablename, df in tablename_to_df.items():
    if _mdf_df['fname'].empty==False:
        new_df = pd.DataFrame()
        new_df['fname'] = pd.Series(list(df.columns))
        new_df['parent_table'] = pd.Series([tablename] * len(df.columns))
        new_df['type'] = pd.Series([str(t) for t in df.dtypes])
        # Need to update additional descriptions in melted tables
        _mdf_df = _mdf_df.append(new_df, ignore_index=True)
    else:
        _mdf_df['fname'] = pd.Series(df.columns)
        _mdf_df['parent_table'] = pd.Series([tablename] * len(df.columns))
        _mdf_df['type'] = pd.Series([str(t) for t in df.dtypes])
        _mdf_df['description'] = pd.Series(field_descs)

In [20]:
_mdf_df.loc[_mdf_df['fname']=='capacity_mw', 'dimension'] = 'MW'
_mdf_df.loc[_mdf_df['fname']=='latitude', 'dimension'] = 'degrees'
_mdf_df.loc[_mdf_df['fname']=='longitude', 'dimension'] = 'degrees'
_mdf_df.loc[_mdf_df['fname']=='commissioning_year', 'dimension'] = 'year'
_mdf_df.loc[_mdf_df['fname']=='year_of_capacity_data', 'dimension'] = 'year'
_mdf_df.loc[_mdf_df['fname']=='year', 'dimension'] = 'year'
_mdf_df.loc[_mdf_df['fname']=='year', 'description'] = 'year of report'
_mdf_df.loc[_mdf_df['fname']=='gppd_idnr', 'description'] = 'unique index into plants table'
_mdf_df.loc[_mdf_df['fname']=='generation_gwh', 'dimension'] = 'GWh'
_mdf_df.loc[_mdf_df['fname']=='generation_gwh', 'description'] = 'electricity generation in gigawatt-hours reported for the year'
_mdf_df.loc[_mdf_df['fname']=='estimated_generation_gwh', 'dimension'] = 'GWh'
_mdf_df.loc[_mdf_df['fname']=='estimated_generation_gwh', 'description'] = 'estimated electricity generation in gigawatt-hours for the year'
_mdf_df.loc[_mdf_df['fname']=='estimated_generation_note', 'description'] = 'label of the model/method used to estimate generation for the year'

metadata_to_df[meta_field] = _mdf_df

### The following is logically wrong because we're just slamming data into the metastore.  Next step is to update/insert data in case we are not the first/only ingestion process ever in the whole universe

In [21]:
for tablename, df in metadata_to_df.items():
    schema = generate_table_schema_pairs(df)
    tabledef = """create table if not exists osc_datacommons_dev.{sname}.{tname}(
{schema}
) with (
    format = 'parquet',
    external_location = 's3a://{bucket}/trino/{sname}/{tname}/'
)""".format(schema=schema,bucket=os.environ['S3_DEV_BUCKET'],sname=metastore,tname=tablename)
    print(tabledef)
    
    cur.execute(tabledef)
    cur.fetchall()

create table if not exists osc_datacommons_dev.metastore.meta_schema(
    title varchar,
    description varchar,
    version varchar,
    release_date varchar,
    uri varchar,
    copyright varchar,
    license varchar,
    contact varchar,
    citation varchar,
    abstract varchar,
    name varchar
) with (
    format = 'parquet',
    external_location = 's3a://ocp-odh-os-demo-s3/trino/metastore/meta_schema/'
)
create table if not exists osc_datacommons_dev.metastore.meta_table(
    tname varchar,
    parent_schema varchar,
    source varchar,
    processing_pipeline varchar
) with (
    format = 'parquet',
    external_location = 's3a://ocp-odh-os-demo-s3/trino/metastore/meta_table/'
)
create table if not exists osc_datacommons_dev.metastore.meta_field(
    fname varchar,
    parent_table varchar,
    type varchar,
    dimension varchar,
    description varchar
) with (
    format = 'parquet',
    external_location = 's3a://ocp-odh-os-demo-s3/trino/metastore/meta_field/'
)


In [22]:
metadata_to_df[meta_field]

Unnamed: 0,fname,parent_table,type,dimension,description
0,country,plants,object,,3 character country code corresponding to the ...
1,country_long,plants,object,,longer form of the country designation
2,name,plants,object,,"name or title of the power plant, generally in..."
3,gppd_idnr,plants,object,,10 or 12 character identifier for the power plant
4,capacity_mw,plants,float64,MW,electrical generating capacity in megawatts
5,latitude,plants,float64,degrees,geolocation in decimal degrees; WGS84 (EPSG:4326)
6,longitude,plants,float64,degrees,geolocation in decimal degrees; WGS84 (EPSG:4326)
7,primary_fuel,plants,object,,energy source used in primary electricity gene...
8,other_fuel1,plants,object,,energy source used in electricity generation o...
9,other_fuel2,plants,object,,energy source used in electricity generation o...
