# Power Plant Data Product: Data Loading Step 2

This notebook covers the second step of the pipeline:
* Load versioned source data from Pachyderm
* Get data into a dataframe 
* Create a Trino client and load the data

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

In [1]:
# Report exceptions only in cells output
%xmode Minimal

Exception reporting mode: Minimal


In [2]:
from dotenv import dotenv_values, load_dotenv
import os
import pathlib
import socket
from pyspark import SparkConf , SparkContext, SparkFiles
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.sql.types import *
import pandas as pd
from sqlalchemy import create_engine, text

Define Environment and Execution Variables

In [3]:
# Load environment variables from credentials.env
dotenv_dir = os.environ.get('CREDENTIAL_DOTENV_DIR', os.environ.get('PWD', '/workspace'))
print (dotenv_dir)
dotenv_path = pathlib.Path(dotenv_dir) / 'credentials.env'
print (dotenv_path)
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path,override=True)

/workspace
/workspace/credentials.env


Read the data directory holding raw source data pulled from DVC, and list all files

In [4]:
path = '../data/global_power_plant_database_v_1_3/'
files = os.listdir(path)
for f in files:
    print(f)

global_power_plant_database.csv
README.txt
RELEASE_NOTES.txt
A_Global_Database_of_Power_Plants.pdf
Estimating_Power_Plant_Generation_in_the_Global_Power_Plant_Database.pdf


In [5]:
srcdata = str(pathlib.Path('../data/global_power_plant_database_v_1_3/global_power_plant_database.csv').absolute())
print(srcdata)

/workspace/data-product-template/load/../data/global_power_plant_database_v_1_3/global_power_plant_database.csv


Create a connection to Spark cluster

In [6]:
SPARK_SERVER = os.environ['SPARK_SERVER']
SPARK_CONNECT_PORT = os.environ['SPARK_CONNECT_PORT']
SPARK_CONNECT_URL = f'{SPARK_SERVER}:{SPARK_CONNECT_PORT}'

In [7]:
conf = SparkConf() \
    .setAppName("spark demo") \
    .setMaster(SPARK_CONNECT_URL) \
    .set("spark.driver.host", socket.gethostbyname(socket.gethostname()))\
    .set("spark.driver.port", "10027") 

spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

24/06/22 13:34:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/22 13:34:03 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
24/06/22 13:34:04 WARN S3ABlockOutputStream: Application invoked the Syncable API against stream writing to logs/app-20240622133403-0025.inprogress. This is unsupported


In [8]:
spark

Create a Spark dataframe from the CSV file (temporary workaround via pandas)

In [9]:
df = pd.read_csv(srcdata, low_memory=False).convert_dtypes()

In [10]:
print(df.info(verbose=True))

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 34936 entries, 0 to 34935
Data columns (total 36 columns):
 #   Column                          Non-Null Count  Dtype  
---  ------                          --------------  -----  
 0   country                         34936 non-null  string 
 1   country_long                    34936 non-null  string 
 2   name                            34936 non-null  string 
 3   gppd_idnr                       34936 non-null  string 
 4   capacity_mw                     34936 non-null  Float64
 5   latitude                        34936 non-null  Float64
 6   longitude                       34936 non-null  Float64
 7   primary_fuel                    34936 non-null  string 
 8   other_fuel1                     1944 non-null   string 
 9   other_fuel2                     276 non-null    string 
 10  other_fuel3                     92 non-null     string 
 11  commissioning_year              17447 non-null  Float64
 12  owner                           

In [11]:
df = df.astype({"year_of_capacity_data": "string"})

In [12]:
df.head()

Unnamed: 0,country,country_long,name,gppd_idnr,capacity_mw,latitude,longitude,primary_fuel,other_fuel1,other_fuel2,...,estimated_generation_gwh_2013,estimated_generation_gwh_2014,estimated_generation_gwh_2015,estimated_generation_gwh_2016,estimated_generation_gwh_2017,estimated_generation_note_2013,estimated_generation_note_2014,estimated_generation_note_2015,estimated_generation_note_2016,estimated_generation_note_2017
0,AFG,Afghanistan,Kajaki Hydroelectric Power Plant Afghanistan,GEODB0040538,33.0,32.322,65.119,Hydro,,,...,123.77,162.9,97.39,137.76,119.5,HYDRO-V1,HYDRO-V1,HYDRO-V1,HYDRO-V1,HYDRO-V1
1,AFG,Afghanistan,Kandahar DOG,WKS0070144,10.0,31.67,65.795,Solar,,,...,18.43,17.48,18.25,17.7,18.29,SOLAR-V1-NO-AGE,SOLAR-V1-NO-AGE,SOLAR-V1-NO-AGE,SOLAR-V1-NO-AGE,SOLAR-V1-NO-AGE
2,AFG,Afghanistan,Kandahar JOL,WKS0071196,10.0,31.623,65.792,Solar,,,...,18.64,17.58,19.1,17.62,18.72,SOLAR-V1-NO-AGE,SOLAR-V1-NO-AGE,SOLAR-V1-NO-AGE,SOLAR-V1-NO-AGE,SOLAR-V1-NO-AGE
3,AFG,Afghanistan,Mahipar Hydroelectric Power Plant Afghanistan,GEODB0040541,66.0,34.556,69.4787,Hydro,,,...,225.06,203.55,146.9,230.18,174.91,HYDRO-V1,HYDRO-V1,HYDRO-V1,HYDRO-V1,HYDRO-V1
4,AFG,Afghanistan,Naghlu Dam Hydroelectric Power Plant Afghanistan,GEODB0040534,100.0,34.641,69.717,Hydro,,,...,406.16,357.22,270.99,395.38,350.8,HYDRO-V1,HYDRO-V1,HYDRO-V1,HYDRO-V1,HYDRO-V1


In [13]:
# Auxiliar functions to create automatically the spark schema with type conversion
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'Int64': return LongType()
    elif f == 'Int32': return IntegerType()
    elif f == 'Float64': return DoubleType()
    elif f == 'Float32': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return spark.createDataFrame(pandas_df, p_schema)

In [14]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark_df = spark.createDataFrame(df)

24/06/22 13:34:36 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.


In [15]:
# spark_df.printSchema()
spark_df.show()
spark_df.printSchema()

24/06/22 13:34:41 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------+--------------------+------------+-----------+--------+---------+------------+-----------+-----------+-----------+------------------+--------------------+--------------------+--------------------+------------------+-------+---------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+----------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+
|country|country_long|                name|   gppd_idnr|capacity_mw|latitude|longitude|primary_fuel|other_fuel1|other_fuel2|other_fuel3|commissioning_year|               owner|              source|                 url|geolocation_source|wepp_id|year_of_capacity_da

Write the dataframe into Iceberg

In [16]:
try:
    spark_df.writeTo("wri_gppd.gppd_source").createOrReplace()
    print("Data written to Iceberg table successfully." )
except Exception as e:
    print("Error writing data to Iceberg table:", str(e))

24/06/22 13:35:24 WARN TaskSetManager: Stage 0 contains a task of very large size (2686 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Data written to Iceberg table successfully.


Connect to Trino and ensure the data is loaded

In [17]:
# Define execution variables for Iceberg / Trino
ingest_catalog = 'iceberg'
ingest_schema = 'wri_gppd'
ingest_table = 'gppd_source'

In [18]:
# Create a connection to Trino using an SQLAlchemy Engine
# Documentation: https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine
engine = create_engine('trino://' + os.environ['TRINO_USER'] + '@' + os.environ['TRINO_HOST'] + ':' + os.environ['TRINO_PORT'])

In [19]:
engine.connect

<bound method Engine.connect of Engine(trino://admin@trino:8080)>

In [20]:
# Show existing catalogs in Trino to make sure the connection works fine
with engine.connect() as connection:
    result = connection.execute(text('show catalogs'))
    for row in result:
        print(row)

('hive',)
('iceberg',)
('system',)
('tpcds',)
('tpch',)


In [21]:
# Show available schemas
schema_show_sql = f"""
show schemas in {ingest_catalog}
"""
with engine.connect() as connection:
    result = connection.execute(text(schema_show_sql))
    for row in result:
        print(row)

('default',)
('demo',)
('information_schema',)
('observability',)
('product',)
('wri_gppd',)


In [22]:
# Show available tables
tables_show_sql = f"""
show tables from {ingest_catalog}.{ingest_schema}
"""
with engine.connect() as connection:
    result = connection.execute(text(tables_show_sql))
    for row in result:
        print(row)

('gppd_power_plants_list',)
('gppd_source',)


In [23]:
# Query table to verify insertion was successful
select_query = f"""
select * from {ingest_catalog}.{ingest_schema}.{ingest_table}
"""
print(select_query)
with engine.connect() as connection:
    result = connection.execute(text(select_query)).fetchmany(10)
    for row in result:
        print(row)


select * from iceberg.wri_gppd.gppd_source

('CAN', 'Canada', 'Pointe du Bois', 'CAN0008238', 75.0, 50.2981, -95.5481, 'Hydro', None, None, None, None, None, 'Natural Resources Canada', 'ftp://ftp.maps.canada.ca/pub/nacei_cnaie/energy_infrastructure/PowerPlantsRenewGE1MW_NorthAmerica_201708.xlsx', 'Natural Resources Canada', '37141', '2017', None, None, None, None, None, None, None, None, 348.18, 386.71, 361.11, 390.03, 366.33, 'HYDRO-V1', 'HYDRO-V1', 'HYDRO-V1', 'HYDRO-V1', 'HYDRO-V1')
('CAN', 'Canada', 'Pointes aux Roches', 'CAN0008239', 48.6, 42.2841, -82.5284, 'Wind', None, None, None, None, 'Pointe-aux-Roches Wind LP', 'Natural Resources Canada', 'ftp://ftp.maps.canada.ca/pub/nacei_cnaie/energy_infrastructure/PowerPlantsRenewGE1MW_NorthAmerica_201708.xlsx', 'Natural Resources Canada', '61695', '2016', None, None, None, None, None, None, None, None, None, None, None, None, 112.75, 'NO-ESTIMATION', 'NO-ESTIMATION', 'NO-ESTIMATION', 'NO-ESTIMATION', 'CAPACITY-FACTOR-V1')
('CAN', 'Ca