In [7]:
import pandas as pd
import numpy as np
import os
import requests
import openpyxl
from datetime import datetime, timedelta
from requests.adapters import HTTPAdapter, Retry
import logging
import time
from sqlalchemy import Column, Float, Integer, MetaData, String, Table, create_engine
from sqlalchemy.dialects import postgresql
from sqlalchemy.engine import URL
from dotenv import load_dotenv

## For implementing Data Analysis on `Excel` spreadsheets

In [62]:
df = pd.read_excel('/Users/aanwar/Desktop/dec_project_1/Global_Economic_Monitor/app/etl_project/data/Industrial Production, constant 2010 US$, seas. adj..xlsx')
df = df.drop(df.index[0])
df.rename(columns={'Unnamed: 0': 'Year'}, inplace=True)
df.fillna(method='ffill', inplace=True)

## Doing some Data Wrangling:

In [18]:
country_columns = df.columns[1:]  # Exclude 'Year'

# Automate table creation and data insertion for each country
for country in country_columns:
    # Create a DataFrame for the current country
    country_df = df[['Year', country]].copy()
    country_df.columns = ['year', 'gdp']  # Rename columns for consistency

    # Create a table for the country (if it doesn't exist) and insert data
    country_name = country.lower().replace(' ', '_')

# Data Engineering via API Call

## My logging function:

In [9]:
import logging
import time


def setup_pipeline_logging(pipeline_name: str, log_folder_path: str):
    # Initialize logger
    logger = logging.getLogger(pipeline_name)
    logger.setLevel(logging.INFO)

    # Create log file path
    file_path = f"{log_folder_path}/{pipeline_name}_{time.time()}.log"

    # Create handlers
    file_handler = logging.FileHandler(file_path)
    stream_handler = logging.StreamHandler()

    # Set logging levels
    file_handler.setLevel(logging.INFO)
    stream_handler.setLevel(logging.INFO)

    # Create formatters and add them to the handlers
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    file_handler.setFormatter(formatter)
    stream_handler.setFormatter(formatter)

    # Add handlers to the logger
    logger.addHandler(file_handler)
    logger.addHandler(stream_handler)

    return logger, file_path


def get_logs(file_path: str) -> str:
    with open(file_path, "r") as file:
        return "".join(file.readlines())

## Postgres Client Class

In [22]:
from sqlalchemy import create_engine, Table, MetaData
from sqlalchemy.engine import URL
from sqlalchemy.dialects import postgresql


class PostgreSqlClient:
    """
    A client for querying PostgreSQL database.
    """

    def __init__(
        self,
        server_name: str,
        database_name: str,
        username: str,
        password: str,
        port: int = 5432,
    ):
        self.host_name = server_name
        self.database_name = database_name
        self.username = username
        self.password = password
        self.port = port

        connection_url = URL.create(
            drivername="postgresql+pg8000",
            username=username,
            password=password,
            host=server_name,
            port=port,
            database=database_name,
        )

        self.engine = create_engine(connection_url)

    def select_all(self, table: Table) -> list[dict]:
        """
        Select all rows from the specified table.
        """
        with self.engine.connect() as conn:
            return [dict(row) for row in conn.execute(table.select()).all()]

    def create_table(self, metadata: MetaData) -> None:
        """
        Creates a table defined in the metadata object.
        """
        metadata.create_all(self.engine)

    def drop_table(self, table_name: str) -> None:
        """
        Drops a table if it exists.
        """
        with self.engine.connect() as conn:
            conn.execute(f"DROP TABLE IF EXISTS {table_name};")

    def insert(self, data: list[dict], table: Table, metadata: MetaData) -> None:
        """
        Inserts data into a table.
        """
        metadata.create_all(self.engine)
        insert_statement = postgresql.insert(table).values(data)
        with self.engine.connect() as conn:
            conn.execute(insert_statement)

    def overwrite(self, data: list[dict], table: Table, metadata: MetaData) -> None:
        """
        Drops the table and recreates it with new data.
        """
        self.drop_table(table.name)
        self.insert(data=data, table=table, metadata=metadata)

    def upsert(self, data: list[dict], table: Table, metadata: MetaData) -> None:
    
        """
        Performs an UPSERT (insert or update on conflict), ensuring no duplicate primary key entries are present in the data.
        """
        
        metadata.create_all(self.engine)

        # Convert list of dicts to DataFrame to drop duplicates
        df = pd.DataFrame(data)
        df = df.drop_duplicates(subset=['year', 'country_code'])

        # Convert back to list of dicts
        data = df.to_dict(orient="records")

        key_columns = [pk_column.name for pk_column in table.primary_key.columns.values()]

        insert_statement = postgresql.insert(table).values(data)

        # Perform conflict resolution: update non-key columns if conflict occurs
        upsert_statement = insert_statement.on_conflict_do_update(
            index_elements=key_columns,
            set_={
                col.key: col for col in insert_statement.excluded if col.key not in key_columns
            }
        )
        with self.engine.connect() as conn:
            conn.execute(upsert_statement)

## Calling API calls using incremental loads:

In [23]:
class WorldBankDataLoader:
    def __init__(self, indicator, start_year, end_year):
        """
        Initialize the WorldBankDataLoader with the indicator, start year, and end year.
        
        Args:
        - indicator (str): The indicator code for the World Bank API (e.g., 'NV.IND.TOTL.KD.ZG').
        - start_year (str): The starting year for the data.
        - end_year (str): The ending year for the data.
        """
        self.indicator = indicator
        self.date_range = f"{start_year}:{end_year}"
        self.base_url = f"https://api.worldbank.org/v2/countries/all/indicators/{self.indicator}?"
        self.params = {
            "date": self.date_range,
            "format": "json",
            "page": 1  # Start at page 1
        }
        self.all_data = []  # To store paginated data

    def fetch_data(self):
        """
        Fetch data from the World Bank API, handling pagination until all data is retrieved.
        
        Returns:
        - pd.DataFrame: A pandas DataFrame containing all the data fetched from the API.
        """
        while True:
            response = requests.get(self.base_url, params=self.params)
            response_data = response.json()

            # Check if valid data is returned
            if len(response_data) < 2 or not response_data[1]:
                break

            # Extend the all_data list with the current page's data
            self.all_data.extend(response_data[1])

            # Increment page number for the next API call
            self.params["page"] += 1

        # Normalize and return the data as a pandas DataFrame
        return pd.json_normalize(data=self.all_data)
    
    
    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Clean and filter the data for specific countries, remove NaN values, 
        and save the result to a CSV file.
        
        Args:
        - df (pd.DataFrame): The DataFrame to be transformed.
        
        Returns:
        - pd.DataFrame: The cleaned and filtered DataFrame.
        """
        print("Starting transform for exports")

        df_selected = df[
            [
                "date",
                "countryiso3code",
                "country.value",
                "indicator.id",
                "indicator.value",
                "value",
            ]
        ]

        df_renamed = df_selected.rename(
            columns={
                "date": "year",
                "countryiso3code": "country_code",
                "country.value": "country_name",
                "indicator.id": "indicator_id",
                "indicator.value": "indicator_value",
            }
        )

        # Remove NaN from the Year and value column
        df_cleaned = df_renamed.dropna(subset=["year"]).dropna(subset=["value"])

        df_cleaned.reset_index(drop=True, inplace=True)

        df_cleaned = df_cleaned.astype({"year": int})

        print("Completed transform")

        try:
            df_cleaned.to_csv("data/industrial_cleaned_export_data.csv", index=False)
            print("Data saved successfully to cleaned_export_data.csv")
        except Exception as e:
            print(f"Error saving data to CSV: {e}")

        return df_cleaned
    
    def load(self, df: pd.DataFrame, postgresql_client, table: Table, metadata: MetaData, load_method: str = "overwrite"):
        """
        Load the cleaned data into a PostgreSQL database.
        
        Args:
        - df (pd.DataFrame): The cleaned DataFrame to be loaded.
        - postgresql_client: A PostgreSQL client instance to execute the load.
        - table (Table): The SQLAlchemy Table object representing the target table.
        - metadata (MetaData): The SQLAlchemy MetaData object representing the schema.
        - load_method (str): The method for loading data: 'insert', 'upsert', or 'overwrite'.
        """
        # Create the upsert statement based on the load method
        if load_method == "insert":
            postgresql_client.insert(
                data=df.to_dict(orient="records"), table=table, metadata=metadata
            )
        elif load_method == "upsert":
            postgresql_client.upsert(
                data=df.to_dict(orient="records"), table=table, metadata=metadata
            )
        elif load_method == "overwrite":
            postgresql_client.overwrite(
                data=df.to_dict(orient="records"), table=table, metadata=metadata
            )
        else:
            raise Exception(
                "Please specify a correct load method: [insert, upsert, overwrite]"
            )

# Example usage
if __name__ == "__main__":

    load_dotenv()
    DB_USERNAME = os.getenv("DB_USERNAME")
    DB_PASSWORD = os.getenv("DB_PASSWORD")
    SERVER_NAME = os.getenv("SERVER_NAME")
    DATABASE_NAME = os.getenv("DATABASE_NAME")
    PORT = os.environ.get("PORT")
    # Create an instance of the loader for the specific indicator and date range
    loader = WorldBankDataLoader(indicator="NV.IND.TOTL.KD.ZG", start_year="2020", end_year="2024")

    logger, log_file = setup_pipeline_logging("worldbankdata_industrial", "logs")
    logger.info("Making api connection")
    logs = get_logs(log_file)
    print(logs)

    # Fetch the data
    logger.info("Starting to fetch data")
    df_data = loader.fetch_data()
    logger.info("Data has been fetched")

    logger.info("Starting Transformation")
    logger.info("Progressing through my transformations")
    cleaned_df = loader.transform(df_data)
    logger.info("Transformations have been finished")

    logger.info("Starting to load into my Postgres")
    logger.info("gathering my postgres server info")
    postgresql_client = PostgreSqlClient(
    server_name=SERVER_NAME,
    database_name=DATABASE_NAME,
    username=DB_USERNAME,
    password=DB_PASSWORD,
    port=PORT,
    )
    logger.info("completed gathering my postgres credential info")
    logger.info("Calling MetaData and creating industrial_data table")
    metadata = MetaData()
    export_table = Table(
        "industrial_data",
        metadata,
        Column("year", Integer, primary_key=True),
        Column("country_code", String, primary_key = True),
        Column("country_name", String),
        Column("indicator_id", String),
        Column("indicator_value", String),
        Column("value", Float),
    )
    logger.info("Doing the Load:::::::>>>>>>>")
    loader.load(
        cleaned_df,
        postgresql_client=postgresql_client,
        table=export_table,
        metadata=metadata,
        load_method="upsert"
    ) 
    logger.info("Load ================>>>>>>>100 percent complete")
    logger.info("pipeline finished")

2024-10-03 21:28:00,691 - worldbankdata_industrial - INFO - Making api connection
2024-10-03 21:28:00,691 - worldbankdata_industrial - INFO - Making api connection
2024-10-03 21:28:00,691 - worldbankdata_industrial - INFO - Making api connection
2024-10-03 21:28:00,691 - worldbankdata_industrial - INFO - Making api connection
2024-10-03 21:28:00,691 - worldbankdata_industrial - INFO - Making api connection
2024-10-03 21:28:00,691 - worldbankdata_industrial - INFO - Making api connection
2024-10-03 21:28:00,694 - worldbankdata_industrial - INFO - Starting to fetch data
2024-10-03 21:28:00,694 - worldbankdata_industrial - INFO - Starting to fetch data
2024-10-03 21:28:00,694 - worldbankdata_industrial - INFO - Starting to fetch data
2024-10-03 21:28:00,694 - worldbankdata_industrial - INFO - Starting to fetch data
2024-10-03 21:28:00,694 - worldbankdata_industrial - INFO - Starting to fetch data
2024-10-03 21:28:00,694 - worldbankdata_industrial - INFO - Starting to fetch data


2024-10-03 21:28:00,691 - worldbankdata_industrial - INFO - Making api connection



2024-10-03 21:28:12,442 - worldbankdata_industrial - INFO - Data has been fetched
2024-10-03 21:28:12,442 - worldbankdata_industrial - INFO - Data has been fetched
2024-10-03 21:28:12,442 - worldbankdata_industrial - INFO - Data has been fetched
2024-10-03 21:28:12,442 - worldbankdata_industrial - INFO - Data has been fetched
2024-10-03 21:28:12,442 - worldbankdata_industrial - INFO - Data has been fetched
2024-10-03 21:28:12,442 - worldbankdata_industrial - INFO - Data has been fetched
2024-10-03 21:28:12,446 - worldbankdata_industrial - INFO - Starting Transformation
2024-10-03 21:28:12,446 - worldbankdata_industrial - INFO - Starting Transformation
2024-10-03 21:28:12,446 - worldbankdata_industrial - INFO - Starting Transformation
2024-10-03 21:28:12,446 - worldbankdata_industrial - INFO - Starting Transformation
2024-10-03 21:28:12,446 - worldbankdata_industrial - INFO - Starting Transformation
2024-10-03 21:28:12,446 - worldbankdata_industrial - INFO - Starting Transformation
2024

Starting transform for exports
Completed transform
Data saved successfully to cleaned_export_data.csv


In [12]:
df_data.head()

Unnamed: 0,countryiso3code,date,value,unit,obs_status,decimal,indicator.id,indicator.value,country.id,country.value
0,AFE,2023,1.9567,,,1,NV.IND.TOTL.KD.ZG,"Industry (including construction), value added...",ZH,Africa Eastern and Southern
1,AFE,2022,2.693175,,,1,NV.IND.TOTL.KD.ZG,"Industry (including construction), value added...",ZH,Africa Eastern and Southern
2,AFE,2021,4.33502,,,1,NV.IND.TOTL.KD.ZG,"Industry (including construction), value added...",ZH,Africa Eastern and Southern
3,AFE,2020,-4.734432,,,1,NV.IND.TOTL.KD.ZG,"Industry (including construction), value added...",ZH,Africa Eastern and Southern
4,AFW,2023,1.744502,,,1,NV.IND.TOTL.KD.ZG,"Industry (including construction), value added...",ZI,Africa Western and Central


In [13]:
df_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1064 entries, 0 to 1063
Data columns (total 10 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   countryiso3code  1064 non-null   object 
 1   date             1064 non-null   object 
 2   value            887 non-null    float64
 3   unit             1064 non-null   object 
 4   obs_status       1064 non-null   object 
 5   decimal          1064 non-null   int64  
 6   indicator.id     1064 non-null   object 
 7   indicator.value  1064 non-null   object 
 8   country.id       1064 non-null   object 
 9   country.value    1064 non-null   object 
dtypes: float64(1), int64(1), object(8)
memory usage: 83.2+ KB


In [14]:
df_cleaned_2 = pd.read_csv("data/cleaned_export_data.csv")

In [15]:
df_cleaned_2.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 887 entries, 0 to 886
Data columns (total 6 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   year             887 non-null    int64  
 1   country_code     872 non-null    object 
 2   country_name     887 non-null    object 
 3   indicator_id     887 non-null    object 
 4   indicator_value  887 non-null    object 
 5   value            887 non-null    float64
dtypes: float64(1), int64(1), object(4)
memory usage: 41.7+ KB
