# ETL Pipeline Automation for WorldBank GDP Data with Python

This project aims to streamline the Extract, Transform, Load (ETL) process by developing a Python script. The script efficiently extracts data from a CSV file, performs necessary transformations, and uploads the refined data to a designated SQL table in a PostgreSQL database. The primary objective is to convert the original data into a more coherent and analytically friendly format, facilitating ease of data analysis.

## Key Features

- **Extraction:** Read data from a CSV file.

- **Transformation:** Apply necessary data transformations for enhanced readability and analysis.

- **Loading:** Upload the transformed data into a PostgreSQL database.

## Why This Project?

Managing data effectively is a critical aspect of any data-driven project. This ETL pipeline automation simplifies the process, making it more accessible and efficient for users to prepare data for analysis.

In [71]:
# Import the libraries 
from sqlalchemy import create_engine
import pandas as pd
import numpy as np


In [72]:
# Original Format
pd.read_csv('Data/gdp_data.csv', skiprows=4).head()

Unnamed: 0,Country Name,Country Code,Indicator Name,Indicator Code,1960,1961,1962,1963,1964,1965,...,2009,2010,2011,2012,2013,2014,2015,2016,2017,Unnamed: 62
0,Aruba,ABW,GDP (current US$),NY.GDP.MKTP.CD,,,,,,,...,2498933000.0,2467704000.0,2584464000.0,,,,,,,
1,Afghanistan,AFG,GDP (current US$),NY.GDP.MKTP.CD,537777800.0,548888900.0,546666700.0,751111200.0,800000000.0,1006667000.0,...,12486940000.0,15936800000.0,17930240000.0,20536540000.0,20264250000.0,20616100000.0,19215560000.0,19469020000.0,20815300000.0,
2,Angola,AGO,GDP (current US$),NY.GDP.MKTP.CD,,,,,,,...,75492390000.0,82526140000.0,104115800000.0,113923200000.0,124912500000.0,126730200000.0,102621200000.0,95337200000.0,124209400000.0,
3,Albania,ALB,GDP (current US$),NY.GDP.MKTP.CD,,,,,,,...,12044210000.0,11926950000.0,12890870000.0,12319780000.0,12776280000.0,13228240000.0,11386930000.0,11883680000.0,13039350000.0,
4,Andorra,AND,GDP (current US$),NY.GDP.MKTP.CD,,,,,,,...,3660531000.0,3355695000.0,3442063000.0,3164615000.0,3281585000.0,3350736000.0,2811489000.0,2877312000.0,3012914000.0,


In [64]:
# Create the connection to the database
conn_string = 'postgresql://username:pass@localhost:1234/dbname'
engine = create_engine(conn_string)

# Create the table on the database
try: 
    with engine.connect() as connection:
        connection.execute('CREATE TABLE IF NOT EXISTS gdp (country_name text, country_code text, indicator_name text, indicator_code text, year int, gdp real)')
except Exception as e:
    print('Error creando la tabla gdp')
    print(e)
    

In [65]:
# Generator for reading in one line at a time
# generators are useful for data sets that are too large to fit in RAM
def extract_lines(file):
    while True:
        line = file.readline()
        if not line:
            break
        yield line

In [66]:
# Transform indicator data
def transform_indicator_data(line, colnames):
    # Filter out values that are not countries
    country = line[0]
    non_countries = ['World',
     'High income',
     'OECD members',
     'Post-demographic dividend',
     'IDA & IBRD total',
     'Low & middle income',
     'Middle income',
     'IBRD only',
     'East Asia & Pacific',
     'Europe & Central Asia',
     'North America',
     'Upper middle income',
     'Late-demographic dividend',
     'European Union',
     'East Asia & Pacific (excluding high income)',
     'East Asia & Pacific (IDA & IBRD countries)',
     'Euro area',
     'Early-demographic dividend',
     'Lower middle income',
     'Latin America & Caribbean',
     'Latin America & the Caribbean (IDA & IBRD countries)',
     'Latin America & Caribbean (excluding high income)',
     'Europe & Central Asia (IDA & IBRD countries)',
     'Middle East & North Africa',
     'Europe & Central Asia (excluding high income)',
     'South Asia (IDA & IBRD)',
     'South Asia',
     'Arab World',
     'IDA total',
     'Sub-Saharan Africa',
     'Sub-Saharan Africa (IDA & IBRD countries)',
     'Sub-Saharan Africa (excluding high income)',
     'Middle East & North Africa (excluding high income)',
     'Middle East & North Africa (IDA & IBRD countries)',
     'Central Europe and the Baltics',
     'Pre-demographic dividend',
     'IDA only',
     'Least developed countries: UN classification',
     'IDA blend',
     'Fragile and conflict affected situations',
     'Heavily indebted poor countries (HIPC)',
     'Low income',
     'Small states',
     'Other small states',
     'Not classified',
     'Caribbean small states',
     'Pacific island small states']
    
    if country not in non_countries:
            line_array = np.array(line, ndmin=2)
            line_array.reshape(1,63)
            df = pd.DataFrame(line_array, columns=colnames).replace('', np.nan)
            # Droping unnecessary columns
            df.drop(['\n'], axis=1, inplace=True)
            # Changing dataframe into long format
            df_melt = df.melt(id_vars=['Country Name', 'Country Code', 'Indicator Name', 'Indicator Code'], value_name='GDP', var_name='Years')
            results = []
            for index, row in df_melt.iterrows():
                country, countrycode, indicatorname, indicatorcode, year, gdp = row
                if str(gdp) != 'nan':
                        results.append([country, countrycode, indicatorname, indicatorcode, year, gdp])
            return results

In [67]:
def load_indicator_data(results):
    if results:
        conn_string = 'postgresql://username:pass@localhost:1234/dbname'
        engine = create_engine(conn_string)
        with engine.connect() as connection:
            for row in results:
                country, countrycode, indicatorname, indicatorcode, year, gdp = row
                connection.execute('INSERT INTO gdp (country_name, country_code, indicator_name, indicator_code, year, gdp) VALUES (%s, %s, %s, %s, %s, %s)',
                            (country, countrycode, indicatorname,indicatorcode, year, gdp))
        engine.dispose()
    return None


In [68]:
path = 'Data/gdp_data.csv'
with open(path) as f:
    data = extract_lines(f)
    for line in data:
        line = line.split(',')
        if len(line) == 63:
            for i, element in enumerate(line):
                line[i] = element.replace('"','')
            if line[0] == 'Country Name':
                colnames = line
            else:
                results = transform_indicator_data(line, colnames)
                load_indicator_data(results)
        
   

In [70]:
conn_string = 'postgresql://username:pass@localhost:1234/dbname'
engine = create_engine(conn_string)
with engine.connect() as connection:
    r = connection.execute('SELECT * FROM gdp LIMIT 10')
    for row in r:
        print(row)
engine.dispose()

('Aruba', 'ABW', 'GDP (current US$)', 'NY.GDP.MKTP.CD', 1994, 1330167600.0)
('Aruba', 'ABW', 'GDP (current US$)', 'NY.GDP.MKTP.CD', 1995, 1320670300.0)
('Aruba', 'ABW', 'GDP (current US$)', 'NY.GDP.MKTP.CD', 1996, 1379888300.0)
('Aruba', 'ABW', 'GDP (current US$)', 'NY.GDP.MKTP.CD', 1997, 1531843600.0)
('Aruba', 'ABW', 'GDP (current US$)', 'NY.GDP.MKTP.CD', 1998, 1665363100.0)
('Aruba', 'ABW', 'GDP (current US$)', 'NY.GDP.MKTP.CD', 1999, 1722798800.0)
('Aruba', 'ABW', 'GDP (current US$)', 'NY.GDP.MKTP.CD', 2000, 1873452500.0)
('Aruba', 'ABW', 'GDP (current US$)', 'NY.GDP.MKTP.CD', 2001, 1920262500.0)
('Aruba', 'ABW', 'GDP (current US$)', 'NY.GDP.MKTP.CD', 2002, 1941094900.0)
('Aruba', 'ABW', 'GDP (current US$)', 'NY.GDP.MKTP.CD', 2003, 2021301600.0)
