# Extracting, Transforming, and Loading Environmental Data into Postgresql

In [5]:
# !pip install psycopg2 sqlalchemy

In [157]:
import pandas as pd
import numpy as np
import psycopg2
from sqlalchemy import create_engine, text

## Connect to Postgres Database

In [93]:
db_name = "environment"
db_user = "isaiaherb"
db_password = "sharkboy1"
db_host = "localhost"
db_port = "5432"  
engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')

## Extract 

In [228]:
df1 = pd.read_csv('Agricultural Land.csv')
df2 = pd.read_csv('co2_emissions_kt_by_country.csv')
df3 = pd.read_csv('CO2_Emissions.csv')
df4 = pd.read_csv('Consumption of fertilizers per unit of agricultural land area.csv')
df5 = pd.read_csv('Forest Area.csv')
df6 = pd.read_csv('Governance.csv')
df7 = pd.read_csv('Public Water Supply.csv')
df8 = pd.read_csv('Terrestrial protected areas.csv')

## Transform and Load 

In [229]:
def clean_int(value):
    try:
        return int(value.replace(' ', ''))
    except (ValueError, AttributeError):
        return None

### Country  

In [232]:
dataframes = [df1, df2, df3, df4, df5, df6, df7, df8]
unique_countries = set()
for df in dataframes:
    if 'country_name' in df.columns:
        unique_countries.update(df['country_name'].unique())
    elif 'Country' in df.columns:
        unique_countries.update(df['Country'].unique())

df_country = pd.DataFrame({'country_name': list(unique_countries)})

df_country.sort_values(by='country_name', inplace=True)
df_country.reset_index(drop=True, inplace=True)

In [233]:
with engine.connect() as conn:
    try:
        for index, row in df_country.iterrows():
            conn.execute(
                text("INSERT INTO country (country_name) VALUES (:country_name) ON CONFLICT (country_name) DO NOTHING"),
                {"country_name": row['country_name']}
            )
        print("Data loaded into 'country' table successfully.")
    except Exception as e:
        print(f"Error: {e}")

Data loaded into 'country' table successfully.


### Agriculture

In [235]:
new_names = {'Country': 'country_name', 
             'Agricultural area in 2013 (km2)': 'total_area_2013', 
             '% change of agricultural area since 1990': 'total_area_percent_change_since_1990',
             '% of total land area covered by agricultural area in 2013':'percent_total_land_covered_by_agriculture_2013',
             'Arable land in 2013 (km2)':'arable_land_2013',
             'Permanent crops in 2013 (km2)':'permanent_crops_2013',
             'Permanent meadows and pastures in 2013 (km2)':'permanent_meadows_pastures_2013',
             'Agricultural area actually irrigated in 2013 (km2)':'agricultural_area_actually_irrigated_2013',
            }
df1.rename(columns=new_names, inplace=True)

In [237]:
with engine.connect() as conn:
    try:
        for index, row in df1.iterrows():
            conn.execute(
                text("INSERT INTO agricultural_land (country_name, total_area_2013, total_area_percent_change_since_1990, percent_total_land_covered_by_agriculture_2013, arable_land_2013, permanent_crops_2013, permanent_meadows_pastures_2013, agricultural_area_actually_irrigated_2013) VALUES (:country_name, :total_area_2013, :total_area_percent_change_since_1990, :percent_total_land_covered_by_agriculture_2013, :arable_land_2013, :permanent_crops_2013, :permanent_meadows_pastures_2013, :agricultural_area_actually_irrigated_2013)"),
                {
                    "country_name": row['country_name'],
                    "total_area_2013": row['total_area_2013'],
                    "total_area_percent_change_since_1990": row['total_area_percent_change_since_1990'],
                    "percent_total_land_covered_by_agriculture_2013": row['percent_total_land_covered_by_agriculture_2013'],
                    "arable_land_2013": row['arable_land_2013'],
                    "permanent_crops_2013": row['permanent_crops_2013'],
                    "permanent_meadows_pastures_2013": row['permanent_meadows_pastures_2013'],
                    "agricultural_area_actually_irrigated_2013": row['agricultural_area_actually_irrigated_2013']
                }
            )
        print("Data loaded into 'agricultural_land' table successfully.")
    except Exception as e:
        print(f"Error: {e}")

Data loaded into 'agricultural_land' table successfully.


### CO2 Emissions 

In [238]:
with engine.connect() as conn:
    try:
        for index, row in df2.iterrows():
            conn.execute(
                text("INSERT INTO co2_emissions (country_name, year, value) VALUES (:country_name, :year, :value)"),
                {"country_name": row['country_name'],
                 "year": row['year'],
                 "value": row['value']
                }
            )
        print("Data loaded into 'co2_emissions' table successfully.")
    except Exception as e:
        print(f"Error: {e}")

Data loaded into 'co2_emissions' table successfully.


### CO2 Emissions Descriptive

In [239]:
df3 = df3.drop(columns=['Unnamed: 3','Unnamed: 4','Unnamed: 5',
                        'Unnamed: 6','Unnamed: 7','Unnamed: 8',
                        'Unnamed: 9','Unnamed: 10','Unnamed: 11',
                        'Unnamed: 12','Unnamed: 13','Unnamed: 14',
                        'Unnamed: 15','Unnamed: 16','Unnamed: 17',
                        'Unnamed: 18','Unnamed: 19','Unnamed: 20',
                        'Unnamed: 21','Unnamed: 22','Unnamed: 23',
                        'Unnamed: 24','Unnamed: 25','Unnamed: 26',
                        'Unnamed: 27','Unnamed: 28','Unnamed: 29',
                        'Unnamed: 30','Unnamed: 31'],axis=1)
df3 = df3.iloc[1:]

In [240]:
new_names = {
    'Country':'country_name',
    'CO2 emissions, latest year':'co2_emissions_latest_year',
    '% change since 1990':'percent_change_since_1990',
    'CO2 emissions \nper capita, \nlatest year':'co2_emissions_per_capita_latest_year'}
df3.rename(columns=new_names, inplace=True)

In [243]:
with engine.connect() as conn:
    try:
        for index, row in df3.iterrows():
            conn.execute(
                text("INSERT INTO co2_emissions_add (country_name, co2_emissions_latest_year, percent_change_since_1990, co2_emissions_per_capita_latest_year) VALUES (:country_name, :co2_emissions_latest_year, :percent_change_since_1990, :co2_emissions_per_capita_latest_year)"),
                {"country_name": row['country_name'],
                "co2_emissions_latest_year": row['co2_emissions_latest_year'],
                "percent_change_since_1990":row['percent_change_since_1990'], 
                "co2_emissions_per_capita_latest_year": row['co2_emissions_per_capita_latest_year']
                }
            )
        print("Data loaded into 'co2_emissions_add' table successfully.")
    except Exception as e:
        print(f"Error: {e}")

Data loaded into 'co2_emissions_add' table successfully.


### Fertilizer Consumption

In [244]:
new_names = {
    'Country': 'country_name',
    'Nitrogen': 'nitrogen',
    'Phosphate': 'phosphate',
    'Potash': 'potash'
}
df4.rename(columns=new_names, inplace=True)

In [245]:
with engine.connect() as conn:
    try:
        for index, row in df4.iterrows():
            conn.execute(
                text("INSERT INTO fertilizer_consumption (country_name, nitrogen, phosphate, potash) VALUES (:country_name, :nitrogen, :phosphate, :potash)"),
                {
                    "country_name": row['country_name'],
                    "nitrogen": row['nitrogen'],
                    "phosphate": row['phosphate'],
                    "potash": row['potash']
                }
            )
        print("Data loaded into 'fertilizer_consumption' table successfully.")
    except Exception as e:
        print(f"Error: {e}")

Data loaded into 'fertilizer_consumption' table successfully.


### Forest

In [266]:
new_names = {
    'Country and Area': 'country_name',
    'Forest Area, 1990 (1000 ha)':'forest_area_1990_ha',
    'Forest Area, 2000 (1000 ha)':'forest_area_2000_ha',
    'Forest Area, 2010 (1000 ha)':'forest_area_2010_ha',
    'Forest Area, 2015 (1000 ha)':'forest_area_2015_ha',
    'Forest Area, 2020 (1000 ha)':'forest_area_2020_ha',
    'Total Land Area, 2020 (1000 ha)':'total_land_area_2020_ha',
    'Forest Area as a  Proportion of (%)\nTotal Land Area, 2020':'forest_area_proportion_total_land_area_2020',
    'Deforestation, \n2015-2020 (1000 ha/year)':'deforestation_2015_2020_ha_per_year',
    'Total Forest Area \nAffected by Fire, 2015 (1000 ha)':'total_forest_area_affected_by_fire_2015_ha'
}
df5.rename(columns=new_names,inplace=True)

In [276]:
with engine.connect() as conn:
    try:
        for index, row in df5.iterrows():
            conn.execute(
                text("INSERT INTO forest_area (country_name, forest_area_1990_ha, forest_area_2000_ha, forest_area_2010_ha, forest_area_2015_ha, forest_area_2020_ha, total_land_area_2020_ha, forest_area_proportion_total_land_area_2020, deforestation_2015_2020_ha_per_year, total_forest_area_affected_by_fire_2015_ha) VALUES (:country_name, :forest_area_1990_ha, :forest_area_2000_ha, :forest_area_2010_ha, :forest_area_2015_ha, :forest_area_2020_ha, :total_land_area_2020_ha, :forest_area_proportion_total_land_area_2020, :deforestation_2015_2020_ha_per_year, :total_forest_area_affected_by_fire_2015_ha)"),
                {
                    "country_name": row['country_name'],
                    "forest_area_1990_ha": row['forest_area_1990_ha'],
                    "forest_area_2000_ha": row['forest_area_2000_ha'],
                    "forest_area_2010_ha": row['forest_area_2010_ha'],
                    "forest_area_2015_ha": row['forest_area_2015_ha'],
                    "forest_area_2020_ha": row['forest_area_2020_ha'],
                    "total_land_area_2020_ha": row['total_land_area_2020_ha'],
                    "forest_area_proportion_total_land_area_2020": row['forest_area_proportion_total_land_area_2020'],
                    "deforestation_2015_2020_ha_per_year": row['deforestation_2015_2020_ha_per_year'],
                    "total_forest_area_affected_by_fire_2015_ha": row['total_forest_area_affected_by_fire_2015_ha']
                }
            )
        print("Data loaded into 'forest_area' table successfully.")
    except Exception as e:
        print(f"Error: {e}")

Data loaded into 'forest_area' table successfully.


### Governance

In [251]:
new_names = {
    'Country and area': 'country_name',
    'Basel Convention': 'basel_convention',
    'CITES': 'cites',
    'Convention on Biological Diversity': 'convention_on_biological_diversity',
    'Convention on Migratory Species': 'convention_on_migratory_species',
    'Kyoto \nProtocol': 'kyoto_protocol',
    'Montreal Protocol': 'montreal_protocol',
    'Paris Agreement': 'paris_agreement',
    'Ramsar Convention': 'ramsar_convention',
    'Rotterdam Convention': 'rotterdam_convention',
    'Stockholm Convention': 'stockholm_convention',
    'UN Convention on the Law of the Sea': 'un_convention_on_the_law_of_the_sea',
    'UN Convention to Combat Desertification': 'un_convention_to_combat_desertification',
    'UN Framework Convention on Climate Change': 'un_framework_convention_on_climate_change',
    'World \nHeritage Convention': 'world_heritage_convention'
}

df6.rename(columns=new_names, inplace=True)

In [252]:
with engine.connect() as conn:
    try:
        for index, row in df6.iterrows():
            conn.execute(
                text("INSERT INTO governance (country_name, basel_convention, cites, convention_on_biological_diversity, convention_on_migratory_species, kyoto_protocol, montreal_protocol, paris_agreement, ramsar_convention, rotterdam_convention, stockholm_convention, un_convention_on_the_law_of_the_sea, un_convention_to_combat_desertification, un_framework_convention_on_climate_change, world_heritage_convention) VALUES (:country_name, :basel_convention, :cites, :convention_on_biological_diversity, :convention_on_migratory_species, :kyoto_protocol, :montreal_protocol, :paris_agreement, :ramsar_convention, :rotterdam_convention, :stockholm_convention, :un_convention_on_the_law_of_the_sea, :un_convention_to_combat_desertification, :un_framework_convention_on_climate_change, :world_heritage_convention)"),
                {
                    "country_name": row['country_name'],
                    "basel_convention": row['basel_convention'],
                    "cites": row['cites'],
                    "convention_on_biological_diversity": row['convention_on_biological_diversity'],
                    "convention_on_migratory_species": row['convention_on_migratory_species'],
                    "kyoto_protocol": row['kyoto_protocol'],
                    "montreal_protocol": row['montreal_protocol'],
                    "paris_agreement": row['paris_agreement'],
                    "ramsar_convention": row['ramsar_convention'],
                    "rotterdam_convention": row['rotterdam_convention'],
                    "stockholm_convention": row['stockholm_convention'],
                    "un_convention_on_the_law_of_the_sea": row['un_convention_on_the_law_of_the_sea'],
                    "un_convention_to_combat_desertification": row['un_convention_to_combat_desertification'],
                    "un_framework_convention_on_climate_change": row['un_framework_convention_on_climate_change'],
                    "world_heritage_convention": row['world_heritage_convention']
                }
            )
        print("Data loaded into 'governance' table successfully.")
    except Exception as e:
        print(f"Error: {e}")

Data loaded into 'governance' table successfully.


### Water Supply

In [253]:
new_names = {
    'Country': 'country_name',
    'latest year available': 'latest_year_available',
    'Net freshwater supplied by water supply industry (mio m3)': 'net_freshwater_supplied_by_water_supply_industry_mio_m3',
    'Net freshwater supplied by water supply industry per capita (m3)': 'net_freshwater_supplied_by_water_supply_industry_per_capita_m3',
    'latest year available.1': 'latest_year_available_1',
    'Total population supplied by water supply industry (%)': 'total_population_supplied_by_water_supply_industry_percent',
    'Net freshwater supplied by water supply industry per capita connected (m3)': 'net_freshwater_supplied_by_water_supply_industry_per_capita'
}

df7.drop(columns=[col for col in df7.columns if 'Footnotes' in col], inplace=True)

df7.rename(columns=new_names, inplace=True)

In [254]:
with engine.connect() as conn:
    try:
        for index, row in df7.iterrows():
            conn.execute(
                text("INSERT INTO water_supply (country_name, latest_year_available, net_freshwater_supplied_by_water_supply_industry_mio_m3, net_freshwater_supplied_by_water_supply_industry_per_capita_m3, total_population_supplied_by_water_supply_industry_percent, net_freshwater_supplied_by_water_supply_industry_per_capita) VALUES (:country_name, :latest_year_available, :net_freshwater_supplied_by_water_supply_industry_mio_m3, :net_freshwater_supplied_by_water_supply_industry_per_capita_m3, :total_population_supplied_by_water_supply_industry_percent, :net_freshwater_supplied_by_water_supply_industry_per_capita)"),
                {
                    "country_name": row['country_name'],
                    "latest_year_available": row['latest_year_available'],
                    "net_freshwater_supplied_by_water_supply_industry_mio_m3": row['net_freshwater_supplied_by_water_supply_industry_mio_m3'],
                    "net_freshwater_supplied_by_water_supply_industry_per_capita_m3": row['net_freshwater_supplied_by_water_supply_industry_per_capita_m3'],
                    "total_population_supplied_by_water_supply_industry_percent": row['total_population_supplied_by_water_supply_industry_percent'],
                    "net_freshwater_supplied_by_water_supply_industry_per_capita": row['net_freshwater_supplied_by_water_supply_industry_per_capita']
                }
            )
        print("Data loaded into 'water_supply' table successfully.")
    except Exception as e:
        print(f"Error: {e}")

Data loaded into 'water_supply' table successfully.


### Protected Areas

In [255]:
new_names = {
    'Country and area': 'country_name',
    'latest year available': 'latest_year_available',
    'Terrestrial protected areas': 'percent_protected_areas'
}
df8.rename(columns=new_names, inplace=True)

In [260]:
with engine.connect() as conn:
    try:
        for index, row in df8.iterrows():
            conn.execute(
                text("INSERT INTO protected_areas (country_name, latest_year_available, percent_protected_areas) VALUES (:country_name, :latest_year_available, :percent_protected_areas)"),
                {
                    "country_name": row['country_name'],
                    "latest_year_available": row['latest_year_available'],
                    "percent_protected_areas": row['percent_protected_areas']
                }
            )
        print("Data loaded into 'protected_areas' table successfully.")
    except Exception as e:
        print(f"Error: {e}")

Data loaded into 'protected_areas' table successfully.


In [None]:
conn.close()
print("PostgreSQL connection is closed.")