## Load SFI GeoAsset Steel and Cement Plant data from 2020 dataset (see https://www.cgfi.ac.uk/spatial-finance-initiative/database-downloads/) for original source

Copyright (C) 2021 OS-Climate

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

### We have a local copy rooted in the S3_BUCKET : SFI_GeoAsset/SFI-Global-Cement-Database-July-2021.xlsx and SFI_GeoAsset/SFI-Global-Steel-Database-July-2021.xlsx

Contributed by Michael Tiemann (Github: MichaelTiemannOSC)

Load Credentials and Data Commons libraries

In [1]:
# From the AWS Account page, copy the export scripts from the appropriate role using the "Command Line or Programmatic Access" link
# Paste the copied text into ~/credentials.env

from dotenv import dotenv_values, load_dotenv
import os
import pathlib

dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src'))
dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env'
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path,override=True)

import osc_ingest_trino as osc
# import pyarrow as pa
# import pyarrow.parquet as pq
# import json
import io
import uuid

Create an S3 resource for the bucket holding source data

In [2]:
import boto3
s3_resource = boto3.resource(
    service_name="s3",
    endpoint_url=os.environ['S3_LANDING_ENDPOINT'],
    aws_access_key_id=os.environ['S3_LANDING_ACCESS_KEY'],
    aws_secret_access_key=os.environ['S3_LANDING_SECRET_KEY'],
)
bucket = s3_resource.Bucket(os.environ['S3_LANDING_BUCKET'])

In [3]:
import trino
from sqlalchemy.engine import create_engine

env_var_prefix = 'TRINO'

sqlstring = 'trino://{user}@{host}:{port}/'.format(
    user = os.environ[f'{env_var_prefix}_USER'],
    host = os.environ[f'{env_var_prefix}_HOST'],
    port = os.environ[f'{env_var_prefix}_PORT']
)
sqlargs = {
    'auth': trino.auth.JWTAuthentication(os.environ[f'{env_var_prefix}_PASSWD']),
    'http_scheme': 'https',
    'catalog': 'osc_datacommons_dev'
}
engine = create_engine(sqlstring, connect_args = sqlargs)
connection = engine.connect()

In [4]:
ingest_catalog = 'osc_datacommons_dev'
ingest_schema = 'sandbox'
sfi_table_prefix = 'sfi_'

In [5]:
# Show available schemas to ensure trino connection is set correctly
schema_read = engine.execute(f'show schemas in {ingest_catalog}')
for row in schema_read.fetchall():
    print(row)

('default',)
('demo_dv',)
('iceberg_demo',)
('information_schema',)
('pcaf_sovereign_footprint',)
('sandbox',)


In [6]:
ingest_uuid = str(uuid.uuid4())

custom_meta_key_fields = 'metafields'
custom_meta_key = 'metaset'

# schemaname = 'sfi_geoasset'
# cur.execute(f"create schema if not exists {schemaname}")
# cur.fetchall()

In [7]:
def create_sfi_table(tablename, df, partitioning=''):
    ingest_table = f'{sfi_table_prefix}{tablename}'
    columnschema = osc.create_table_schema_pairs(df, typemap={"datetime64[ns]":"timestamp(6)",
                                                              "datetime64[ns, UTC]":"timestamp(6) with time zone"})

    drop_table = engine.execute(f"drop table if exists {ingest_schema}.{ingest_table}")
    drop_table.fetchall()

    tabledef = f"""
create table if not exists {ingest_catalog}.{ingest_schema}.{ingest_table}(
{columnschema}
) with (
{'partitioning = ' if partitioning else ''}{partitioning}{',' if partitioning else ''}
format = 'ORC'
)
"""
    print(tabledef)
    qres = engine.execute(tabledef)
    print(qres.fetchall())
    df.to_sql(ingest_table,
              con=engine, schema=ingest_schema, if_exists='append',
              index=False,
              method=osc.TrinoBatchInsert(batch_size = 10000, verbose = True))

Load SFI/GeoAsset data file using pandas *read_excel* and using *ingest_uuid* as the global UUID for this ingestion

Note that UTC=True doesn't work in our Iceberg yet

In [8]:
import pandas as pd
import numpy as np

bObj = bucket.Object('SFI_GeoAsset/SFI-Global-Cement-Database-July-2021.xlsx')
cement_bytes = io.BytesIO(bObj.get()['Body'].read())
timestamp = bObj.last_modified.isoformat()

cement_df = pd.read_excel(cement_bytes, sheet_name='SFI_ALD_Cement_Database',
                          dtype={'latitude':'float64', 'longitude':'float64'}, engine='openpyxl')
cement_df.year = pd.to_datetime(cement_df.year, format='%Y', utc=False)
cement_df.insert(cement_df.columns.get_loc("capacity")+1, "capacity_units", "megaCement_ton")
cement_df = cement_df.convert_dtypes()

In [9]:
create_sfi_table('cement', cement_df, "array['year']")


create table if not exists osc_datacommons_dev.sandbox.sfi_cement(
    uid varchar,
    city varchar,
    state varchar,
    country varchar,
    iso3 varchar,
    country_code bigint,
    region varchar,
    sub_region varchar,
    latitude double,
    longitude double,
    accuracy varchar,
    status varchar,
    plant_type varchar,
    production_type varchar,
    capacity double,
    capacity_units varchar,
    capacity_source varchar,
    year timestamp(6),
    owner_permid bigint,
    owner_name varchar,
    owner_source varchar,
    parent_permid bigint,
    parent_name varchar,
    ownership_stake double,
    parent_lei varchar,
    parent_holding_status varchar,
    parent_ticker varchar,
    parent_exchange varchar,
    parent_permid_2 bigint,
    parent_name_2 varchar,
    ownership_stake_2 double,
    parent_lei_2 varchar,
    parent_holding_status_2 varchar,
    parent_ticker_2 varchar,
    parent_exchange_2 varchar
) with (
partitioning = array['year'],
format = 'ORC'
)

In [10]:
cement_df.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3117 entries, 0 to 3116
Data columns (total 35 columns):
 #   Column                   Non-Null Count  Dtype         
---  ------                   --------------  -----         
 0   uid                      3117 non-null   string        
 1   city                     3111 non-null   string        
 2   state                    3055 non-null   string        
 3   country                  3117 non-null   string        
 4   iso3                     3117 non-null   string        
 5   country_code             3116 non-null   Int64         
 6   region                   3117 non-null   string        
 7   sub_region               3117 non-null   string        
 8   latitude                 3117 non-null   Float64       
 9   longitude                3117 non-null   Float64       
 10  accuracy                 3117 non-null   string        
 11  status                   3117 non-null   string        
 12  plant_type               2980 non-

In [11]:
df = pd.read_excel(cement_bytes, sheet_name='About', header=None)
cement_meta_content = {
    'title': df.iloc[0,0],
    'license': df.iloc[2,0],
    'description': df.iloc[4,0],
    'citation': df.iloc[6,0],
    'contact': df.iloc[8,0],
    'release_date': 'July 2021',
    # How should we describe our transformative step here?
}
df = df.iloc[:, [1, 2]].dropna(axis=0).T
df2 = pd.DataFrame(index=[0], columns=df.iloc[0])
df2.loc[0] = df.iloc[1].values
# df = df.dropna(axis=0).set_index(0).T
df = df2
df

1,uid,city,state,country,iso3,country_code,region,sub_region,latitude,longitude,...,parent_holding_status,parent_ticker,parent_exchange,parent_permid_2,parent_name_2,ownership_stake_2,parent_lei_2,parent_holding_status_2,parent_ticker_2,parent_exchange_2
0,Unique identifier for the cement plant,City in which the plant is located,State or province in which the plant is located,Country in which the plant is located,Three-letter country code defined in ISO 3166-...,Three-digit country code defined in ISO 3166-1...,Region in which the plant is located,Subregion in which the plant is located,Latitude for the geolocation of the plant (bas...,Longitude for the geolocation of the plant (ba...,...,The holding status of the ultimate parent (Pri...,"The primary ticker for the ultimate parent, if...","The primary exchange for the ultimate parent, ...",PermID of the 2nd ultimate parent of the owner...,Name of the 2nd ultimate parent of the owner o...,The percentage ownership attributed to the 2nd...,Legal Entity Identifier (LEI) of the 2nd ultim...,The holding status of the 2nd ultimate parent ...,The primary ticker for the 2nd ultimate parent...,The primary exchange for the 2nd ultimate pare...


In [12]:
cement_meta_fields = df.to_dict('records')

In [13]:
cement_meta_fields

[{'uid': 'Unique identifier for the cement plant',
  'city': 'City in which the plant is located',
  'state': 'State or province in which the plant is located',
  'country': 'Country in which the plant is located',
  'iso3': 'Three-letter country code defined in ISO 3166-1 alpha 3',
  'country_code': 'Three-digit country code defined in ISO 3166-1 numeric',
  'region': 'Region in which the plant is located',
  'sub_region': 'Subregion in which the plant is located',
  'latitude': 'Latitude for the geolocation of the plant (based on WGS84 (EPSG:4326))',
  'longitude': 'Longitude for the geolocation of the plant (based on WGS84 (EPSG:4326))',
  'accuracy': 'The accuracy of the latitude and longitude',
  'status': 'Current plant operating status',
  'plant_type': 'The type of cement plant (Integrated or Grinding)',
  'production_type': 'The production process used to produce the clinker at Integrated plants (Wet or Dry)',
  'capacity': 'Total cement production capacity (millions of tons)'

In [14]:
display(cement_df.columns)

bObj = bucket.Object('SFI_GeoAsset/SFI-Global-Steel-Database-July-2021.xlsx')
steel_bytes = io.BytesIO(bObj.get()['Body'].read())
timestamp = max(timestamp, bObj.last_modified.isoformat())

steel_df = pd.read_excel(steel_bytes, sheet_name='SFI_ALD_Steel_Database',
                         dtype={'latitude':'float64', 'longitude':'float64'}, engine='openpyxl')
steel_df.year = pd.to_datetime(steel_df.year, format='%Y', utc=False)
steel_df.insert(steel_df.columns.get_loc("capacity")+1, "capacity_units", "megaFe_ton")
steel_df = steel_df.convert_dtypes()
steel_df.info(verbose=True)

Index(['uid', 'city', 'state', 'country', 'iso3', 'country_code', 'region',
       'sub_region', 'latitude', 'longitude', 'accuracy', 'status',
       'plant_type', 'production_type', 'capacity', 'capacity_units',
       'capacity_source', 'year', 'owner_permid', 'owner_name', 'owner_source',
       'parent_permid', 'parent_name', 'ownership_stake', 'parent_lei',
       'parent_holding_status', 'parent_ticker', 'parent_exchange',
       'parent_permid_2', 'parent_name_2', 'ownership_stake_2', 'parent_lei_2',
       'parent_holding_status_2', 'parent_ticker_2', 'parent_exchange_2'],
      dtype='object')

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1598 entries, 0 to 1597
Data columns (total 36 columns):
 #   Column                   Non-Null Count  Dtype         
---  ------                   --------------  -----         
 0   uid                      1598 non-null   string        
 1   city                     1593 non-null   string        
 2   state                    1553 non-null   string        
 3   country                  1598 non-null   string        
 4   iso3                     1598 non-null   string        
 5   country_code             1598 non-null   Int64         
 6   region                   1598 non-null   string        
 7   sub_region               1598 non-null   string        
 8   latitude                 1598 non-null   Float64       
 9   longitude                1598 non-null   Float64       
 10  accuracy                 1598 non-null   string        
 11  status                   1598 non-null   string        
 12  plant_type               669 non-n

In [15]:
create_sfi_table('steel', steel_df, "array['year']")


create table if not exists osc_datacommons_dev.sandbox.sfi_steel(
    uid varchar,
    city varchar,
    state varchar,
    country varchar,
    iso3 varchar,
    country_code bigint,
    region varchar,
    sub_region varchar,
    latitude double,
    longitude double,
    accuracy varchar,
    status varchar,
    plant_type varchar,
    primary_production_type varchar,
    primary_product varchar,
    capacity double,
    capacity_units varchar,
    capacity_source varchar,
    year timestamp(6),
    owner_permid bigint,
    owner_name varchar,
    owner_source varchar,
    parent_permid bigint,
    parent_name varchar,
    ownership_stake double,
    parent_lei varchar,
    parent_holding_status varchar,
    parent_ticker varchar,
    parent_exchange varchar,
    parent_permid_2 bigint,
    parent_name_2 varchar,
    ownership_stake_2 double,
    parent_lei_2 varchar,
    parent_holding_status_2 varchar,
    parent_ticker_2 varchar,
    parent_exchange_2 varchar
) with (
partitioni

In [16]:
df = pd.read_excel(steel_bytes, sheet_name='About', header=None)
steel_meta_content = {
    'title': df.iloc[0,0],
    'license': df.iloc[2,0],
    'description': df.iloc[4,0],
    'citation': df.iloc[6,0],
    'contact': df.iloc[8,0],
    'release_date': 'July 2021',
    # How should we describe our transformative step here?
}
df = df.iloc[:, [1, 2]].dropna(axis=0).T
df2 = pd.DataFrame(index=[0], columns=df.iloc[0])
df2.loc[0] = df.iloc[1].values
# df = df.dropna(axis=0).set_index(0).T
df = df2
df

1,uid,city,state,country,iso3,country_code,region,sub_region,latitude,longitude,...,parent_holding_status,parent_ticker,parent_exchange,parent_permid_2,parent_name_2,ownership_stake_2,parent_lei_2,parent_holding_status_2,parent_ticker_2,parent_exchange_2
0,Unique identifier for the plant,City in which the plant is located,State or province in which the plant is located,Country in which the plant is located,Three-letter country code defined in ISO 3166-...,Three-digit country code defined in ISO 3166-1...,Region in which the plant is located,Subregion in which the plant is located,Latitude for the geolocation of the plant (bas...,Longitude for the geolocation of the plant (ba...,...,The holding status of the ultimate parent (Pri...,"The primary ticker for the ultimate parent, if...","The primary exchange for the ultimate parent, ...",PermID of the 2nd ultimate parent of the owner...,Name of the 2nd ultimate parent of the owner o...,The percentage ownership attributed to the 2nd...,Legal Entity Identifier (LEI) of the 2nd ultim...,The holding status of the 2nd ultimate parent ...,The primary ticker for the 2nd ultimate parent...,The primary exchange for the 2nd ultimate pare...


In [17]:
steel_meta_fields = df.to_dict('records')

In [18]:
steel_meta_fields

[{'uid': 'Unique identifier for the plant',
  'city': 'City in which the plant is located',
  'state': 'State or province in which the plant is located',
  'country': 'Country in which the plant is located',
  'iso3': 'Three-letter country code defined in ISO 3166-1 alpha 3',
  'country_code': 'Three-digit country code defined in ISO 3166-1 numeric',
  'region': 'Region in which the plant is located',
  'sub_region': 'Subregion in which the plant is located',
  'latitude': 'Latitude for the geolocation of the plant (based on WGS84 (EPSG:4326))',
  'longitude': 'Longitude for the geolocation of the plant (based on WGS84 (EPSG:4326))',
  'accuracy': 'The accuracy of the latitude and longitude',
  'status': 'Current plant operating status',
  'plant_type': 'The type of iron and steel production facility. Plant types include: Integrated, Mini-Mill, DRI, Downstream, Coke, and Pelletisation plants.',
  'primary_production_type': 'The primary production type used at the plant',
  'primary_pro