<font size="5">Ingest WDI - "Population" data into Trino pipeline</font>

In [1]:
from dotenv import dotenv_values, load_dotenv
import osc_ingest_trino as osc
import os
import pathlib
import shutil

In [2]:
dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/opt/app-root/src'))
dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env'
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path,override=True)

In [3]:
# use a catalog that is configured for iceberg

iceberg_catalog = 'osc_datacommons_dev'
iceberg_schema = 'pcaf_sovereign_footprint'
pcaf_table_prefix = ''
iceberg_table = 'sf_wdi_population'

ingest_catalog = 'osc_datacommons_hive_ingest'
ingest_schema = 'ingest'


#ingest_catalog = 'osc_datacommons_dev'
#ingest_schema = 'pcaf_sovereign_footprint'


Load Environment Variables

In [4]:
import trino
from sqlalchemy.engine import create_engine

env_var_prefix = 'TRINO'

sqlstring = 'trino://{user}@{host}:{port}/'.format(
    user = os.environ[f'{env_var_prefix}_USER'],
    host = os.environ[f'{env_var_prefix}_HOST'],
    port = os.environ[f'{env_var_prefix}_PORT']
)
sqlargs = {
    'auth': trino.auth.JWTAuthentication(os.environ[f'{env_var_prefix}_PASSWD']),
    'http_scheme': 'https',
    'catalog': 'osc_datacommons_dev'
}
engine = create_engine(sqlstring, connect_args = sqlargs)
connection = engine.connect()

trino_bucket = osc.attach_s3_bucket("S3_DEV")

In [5]:
import boto3

s3_source = boto3.resource(
    service_name="s3",
    endpoint_url=os.environ['S3_LANDING_ENDPOINT'],
    aws_access_key_id=os.environ['S3_LANDING_ACCESS_KEY'],
    aws_secret_access_key=os.environ['S3_LANDING_SECRET_KEY'],
)
source_bucket = s3_source.Bucket(os.environ['S3_LANDING_BUCKET'])

Open a Trino connection using JWT for authentication

In [6]:
# Show available schemas to ensure trino connection is set correctly
schema_read = engine.execute(f'show schemas in {ingest_catalog}')
for row in schema_read.fetchall():
    print(row)

('aicoe_osc_demo_results',)
('default',)
('demo_dv',)
('dera',)
('essd',)
('iceberg_demo',)
('information_schema',)
('ingest',)
('mdt_sandbox',)
('pcaf_sovereign_footprint',)
('sandbox',)
('wri_gppd',)


Load GDP file (updated sporadically from https://data.worldbank.org/indicator/NY.GDP.PCAP.PP.CD)

In [7]:
import pandas as pd
import openpyxl
import ParseXLS as parser



## GDP
ticker_file = s3_source.Object(os.environ['S3_LANDING_BUCKET'],'PCAF-sovereign-footprint/WDI/Population_Data.csv')
ticker_file.download_file(f'/tmp/Population_Data.csv')

df = parser.process('WDI_population.ini','WDI_population.csv') 

# year values are delivered twice . remove the unnecessary part

df[['validity_date', 'column_to_drop']] = df['validity_date'].str.split(' ', expand=True)

df.drop('column_to_drop', axis=1, inplace=True)


## GDP PPP

#df=df.astype({'validity_date': 'int32'})
df= df.convert_dtypes()
df.info(verbose=True)
df= df[['rec_source','data_provider','country_iso_code','country_name','validity_date','attribute','value','value_units']].dropna(subset=['value'])
#df = df.convert_dtypes()
print(df.info(verbose=True))


#df


WDI_population.ini
file_list:
['/tmp/Population_Data.csv']
/tmp/Population_Data.csv
2
csv
/tmp/Population_Dat
<configparser.ConfigParser object at 0x7ff07092a850>
           Series Name  Series Code  \
0    Population, total  SP.POP.TOTL   
1    Population, total  SP.POP.TOTL   
2    Population, total  SP.POP.TOTL   
3    Population, total  SP.POP.TOTL   
4    Population, total  SP.POP.TOTL   
..                 ...          ...   
261  Population, total  SP.POP.TOTL   
262  Population, total  SP.POP.TOTL   
263  Population, total  SP.POP.TOTL   
264  Population, total  SP.POP.TOTL   
265  Population, total  SP.POP.TOTL   

                                   Country Name Country Code 1990 [YR1990]  \
0                                   Afghanistan          AFG      12412311   
1                                       Albania          ALB       3286542   
2                                       Algeria          DZA      25758872   
3                                American Samoa         

In [8]:
#%run -i TransposeXLS.py --config WDI.ini --output=WDI.csv 

In [9]:
import osc_ingest_trino as osc
columnschema = osc.create_table_schema_pairs(df) 

sql = f"""
drop table if exists {iceberg_catalog}.{iceberg_schema}.{iceberg_table}
"""
print(sql)
qres = engine.execute(sql)
#print(qres.fetchall())





drop table if exists osc_datacommons_dev.pcaf_sovereign_footprint.sf_wdi_population



In [10]:
tabledef = f"""
create table if not exists {iceberg_catalog}.{iceberg_schema}.{iceberg_table}(
{columnschema}
) with (
    format = 'ORC',
    partitioning = array['validity_date']
)
"""
print(tabledef)
qres = engine.execute(tabledef)
#print(qres.fetchall())


create table if not exists osc_datacommons_dev.pcaf_sovereign_footprint.sf_wdi_population(
    rec_source varchar,
    data_provider varchar,
    country_iso_code varchar,
    country_name varchar,
    validity_date varchar,
    attribute varchar,
    value varchar,
    value_units varchar
) with (
    format = 'ORC',
    partitioning = array['validity_date']
)



In [11]:
# Delete the data for the related attribute 
sql=f"""
delete from {iceberg_catalog}.{iceberg_schema}.{iceberg_table} 
"""
qres = engine.execute(sql)
print(qres.fetchall())

[(None,)]


In [12]:
sql=f"""
select * from {iceberg_catalog}.{iceberg_schema}.{iceberg_table} 
"""
pd.read_sql(sql, engine)


Unnamed: 0,rec_source,data_provider,country_iso_code,country_name,validity_date,attribute,value,value_units


In [13]:
print(ingest_catalog)
#df=df.drop(df[df.country_name=="cote d'ivoire"].index)
df.to_sql(iceberg_table,
           con=engine,
           schema=iceberg_schema,
           if_exists='append',
           index=False,
           method=osc.TrinoBatchInsert(batch_size = 1000, verbose = True))

osc_datacommons_hive_ingest
constructed fully qualified table name as: "pcaf_sovereign_footprint.sf_wdi_population"
inserting 1000 records
  ('Population_Data.csv', 'WDI', 'AFG', 'Afghanistan', '1990', 'Population, total', '12412311', '')
  ('Population_Data.csv', 'WDI', 'ALB', 'Albania', '1990', 'Population, total', '3286542', '')
  ('Population_Data.csv', 'WDI', 'DZA', 'Algeria', '1990', 'Population, total', '25758872', '')
  ...
  ('Population_Data.csv', 'WDI', 'TUV', 'Tuvalu', '2013', 'Population, total', '10849', '')
batch insert result: [(1000,)]
inserting 1000 records
  ('Population_Data.csv', 'WDI', 'UGA', 'Uganda', '2013', 'Population, total', '35694519', '')
  ('Population_Data.csv', 'WDI', 'UKR', 'Ukraine', '2013', 'Population, total', '45489648', '')
  ('Population_Data.csv', 'WDI', 'ARE', 'United Arab Emirates', '2013', 'Population, total', '9197908', '')
  ...
  ('Population_Data.csv', 'WDI', 'NRU', 'Nauru', '2017', 'Population, total', '10577', '')
batch insert result: [

In [14]:
import pandas as pd
sql=f"""
select * from {iceberg_catalog}.{iceberg_schema}.{iceberg_table} " + " where country_iso_code='BHS' order by validity_date desc"""
pd.read_sql(sql, engine)


Unnamed: 0,rec_source,data_provider,country_iso_code,country_name,validity_date,attribute,value,value_units
0,Population_Data.csv,WDI,BHS,"Bahamas, The",2021,"Population, total",396914,
1,Population_Data.csv,WDI,BHS,"Bahamas, The",2020,"Population, total",393248,
2,Population_Data.csv,WDI,BHS,"Bahamas, The",2019,"Population, total",389486,
3,Population_Data.csv,WDI,BHS,"Bahamas, The",2018,"Population, total",385635,
4,Population_Data.csv,WDI,BHS,"Bahamas, The",2017,"Population, total",381749,
5,Population_Data.csv,WDI,BHS,"Bahamas, The",2016,"Population, total",377923,
6,Population_Data.csv,WDI,BHS,"Bahamas, The",2015,"Population, total",374200,
7,Population_Data.csv,WDI,BHS,"Bahamas, The",2014,"Population, total",370625,
8,Population_Data.csv,WDI,BHS,"Bahamas, The",2013,"Population, total",367162,
9,Population_Data.csv,WDI,BHS,"Bahamas, The",2012,"Population, total",363581,


In [15]:
dbt_dict = {}
dbt_dict['models'] = {}
hive_bucket = osc.attach_s3_bucket('S3_OSCCL2')

In [24]:
def create_trino_table_and_dbt_metadata(tablename, df, partition_columns=[], custom_meta_content='', custom_meta_fields='', verbose=False):
    iceberg_table = f'{pcaf_table_prefix}{tablename}'
    print ("iceberg_table:"+ iceberg_table)

    if custom_meta_content:
        dbt_models = dbt_dict['models']
        dbt_models[iceberg_table] = dbt_table = { 'description': custom_meta_content['description']}
        if custom_meta_fields:
            dbt_table['columns'] = dbt_columns = (
                { name: {'description': custom_meta_fields[name]['Description'] } for name in custom_meta_fields.keys() }
            )
            for name in custom_meta_fields.keys():
                if 'tags' in custom_meta_fields[name].keys():
                    dbt_columns[name]['tags'] = custom_meta_fields[name]['tags']
    elif custom_meta_fields:
        raise VALUE_ERROR

    drop_table = engine.execute(f"drop table if exists {iceberg_schema}.{iceberg_table}_source")
    #drop_table.fetchall()
    
    osc.fast_pandas_ingest_via_hive(
        df,
        engine,
        iceberg_catalog, iceberg_schema, f"{iceberg_table}_source",
        hive_bucket, ingest_catalog, ingest_schema,
        partition_columns = partition_columns,
        overwrite = True,
        typemap={'datetime64[ns]':'date'},
        verbose = verbose
    )

    with open(f"/opt/app-root/src/PCAF-sovereign-footprint/dbt/pcaf_transform/models/{iceberg_table}.sql", "w", encoding="utf-8") as f:
        print("{{ config(materialized='view', view_security='invoker') }}" + f"""
with source_data as (
    select {', '.join(df.columns)}
    from {iceberg_catalog}.{iceberg_schema}.{iceberg_table}_source
)
select * from source_data

""", file=f)


In [17]:
def description_is (s):
    return { 'Description': s}

In [18]:
shutil.rmtree("/opt/app-root/src/PCAF-sovereign-footprint/dbt/pcaf_transform/models", ignore_errors=True)
#path = "/opt/app-root/src/sovereign-footprint"
#dir_list = os.listdir(path)
#print("Files and directories in '", path, "' :") 
# print the list
#print(dir_list)

os.mkdir("/opt/app-root/src/PCAF-sovereign-footprint/dbt/pcaf_transform/models", mode=0o755)
custom_meta_content = {
    'data provider': df[['data_provider']],
     'description': 'Description',
}
custom_meta_fields= {}
custom_meta_fields['country_iso_code'] = description_is("Country ISO Code")

In [25]:
create_trino_table_and_dbt_metadata('sf_wdi_population', df, [], custom_meta_content, custom_meta_fields,verbose=True)

iceberg_table:sf_wdi_population

verifying existence of table osc_datacommons_dev.pcaf_sovereign_footprint.sf_wdi_population_source
create table if not exists osc_datacommons_dev.pcaf_sovereign_footprint.sf_wdi_population_source (
    rec_source varchar,
    data_provider varchar,
    country_iso_code varchar,
    country_name varchar,
    validity_date varchar,
    attribute varchar,
    value varchar,
    value_units varchar
) with (
    format = 'parquet'
)

staging dataframe parquet to s3 osc-datacommons-s3-bucket-dev02
/tmp/cdd5e53782f04bd5ba9c77be1ca916bc.parquet  -->  trino/ingest/ingest_temp_44d16fab/cdd5e53782f04bd5ba9c77be1ca916bc.parquet

declaring intermediate hive table osc_datacommons_hive_ingest.ingest.ingest_temp_44d16fab
create table if not exists osc_datacommons_hive_ingest.ingest.ingest_temp_44d16fab (
    rec_source varchar,
    data_provider varchar,
    country_iso_code varchar,
    country_name varchar,
    validity_date varchar,
    attribute varchar,
    value 

In [20]:
dbt_yml = open("/opt/app-root/src/PCAF-sovereign-footprint/dbt/pcaf_transform/models/pcaf_schema.yml", "w", encoding="utf-8")

In [21]:
print("version: 2", file=dbt_yml)

indent = 0
print("\nmodels:", file=dbt_yml)
indent = indent + 2
for name in dbt_dict['models']:
    model = dbt_dict['models'][name]
    print(f"{' '*indent}- name: {name}", file=dbt_yml)
    indent = indent + 2
    print(f"{' '*indent}description: {model['description']}", file=dbt_yml)
    print(f"\n{' '*indent}columns:", file=dbt_yml)
    indent = indent + 2
    columns = model['columns']
    for col in columns:
        print(f"{' '*indent}- name: {col}", file=dbt_yml)
        indent = indent + 2
        for col_meta in columns[col].keys():
            print(f"{' '*indent}{col_meta}: {columns[col][col_meta]}", file=dbt_yml)
        indent = indent - 2
    print("", file=dbt_yml) # newline comes for free...
    indent = indent - 4
indent = indent - 2
assert(indent==0)

In [22]:
dbt_yml.close()