# OVERVIEW


__In this notebook, we demonstrate the idea of building ETL pipeline:__<br> 
+ extract various formats of data(i.e: json, csv) from multiple sources (i.e: local computer, websites)
+ perform some transformation on the data
+ load the data to the database

__There are five parts in this project:__<br>
+ [CREATE CONFIG FILE](#CREATE-CONFIG-FILE)


+ [CONSTRUCT EXTRACT CLASS](#CONSTRUCT-EXTRACT-CLASS)


+ [CONSTRUCT DATABASE CLASS](#CONSTRUCT-DATABASE-CLASS)


+ [CONSTRUCT TRANSFORMATION AND LOAD CLASS](#CONSTRUCT-TRANSFORMATION-AND-LOAD-CLASS)


+ [HOW EVERYTHING WORKS TOGETHER](#HOW-EVERYTHING-WORKS-TOGETHER)


__Scripts are written in the following manner:__
+ Configurability:<br>
The idea of it is to create a JSON file keeping track of all properties of data sources instead of hardcoding them in code every time we use it.  Whenever we need to make any changes to e.g: API key or database user, we could do it with ease by just updating the config file


+ Modularity:<br>
We took into account the idea of OOP by creating distinct classes handling different tasks. By assigning the specific role for each class, we could have a clear structure in mind and could understand how these classes interact with each other when we handle some business requirements.


+ Scalability:<br>
Our codes are built in such a way that is able to handle new requirements (i.e: new logic, new features) without changing much in the original code 



In [15]:
# Import libraries
import requests
import pandas as pd
import numpy as np
import json
import sqlite3
import psycopg2
from psycopg2 import Error

## CREATE CONFIG FILE

In [16]:
config= {
    "data_sources": {
        "csv": {
            "Covid Cases": "CovidDeaths.csv"
               },
        "api": {
            "Pollution": "https://api.openaq.org/v1/latest?country=FI&limit=500",
               },
        "PostgreSQL_cred":{
            "user": "postgres",
            "password": "*************", 
            "host": "localhost",
            "port": "5432",
            "db_name": "PortfolioProject"
                          }
                     }
         } 

In [9]:
# Let's write a data config file
with open('data_config.json','w') as outfile:
    json.dump(config,outfile,indent=4) 

In [86]:
# Let's check whether it works
json.load(open('data_config.json'))

{'data_sources': {'csv': {'CovidDeaths': 'CovidDeaths.csv'},
  'api': {'Pollution': 'https://api.openaq.org/v1/latest?country=FI&limit=500'},
  'PostgreSQL_cred': {'user': 'postgres',
   'password': '*************',
   'host': 'localhost',
   'port': '5432',
   'db_name': 'PortfolioProject'}}}

## CONSTRUCT EXTRACT CLASS

We created a class which is loosely-coupled with generic methods hanlding only the extraction task so that for any future projects, we could make use of the script to extract data from multiple sources

In [7]:
import pandas as pd
import requests
import json

class Extract:
    """
    The class incorporates several methods used to fetch data having various formats
    from multiple sources (e.g: csv, json, etc)
    """
    
    def __init__(self):
        """
        Constructor creates an Extract object and assigns values to instance 
        variables (i.e: self.data_sources, self.api, self.csv_path)
        """
        # Load and assign the config file  to use it across different class methods
        self.data_sources=json.load(open('data_config.json'))
        
        # Assign the value of key 'api' and key 'csv' to instance variables
        self.api= self.data_sources['data_sources']['api']
        self.csv_path= self.data_sources['data_sources']['csv']
        
    def getAPIsdata(self,dataset):
        """
        Function is used to fetch the json format data from sources using api provided
        Function requires one argument which is dataset representing the name of the 
        dataset 
        Function returns json format data
        """
        api_url= self.api[dataset]
        response= requests.get(api_url)
        return response.json()
    
    def getCSVdata(self, csv_name):
        """
        Function is used to fetch the csv format data from User/ADMIN directory of the localhost
        Function requires one argument which is the csv_name (i.e: not the name of the file 
        but rather the name set in the config file)
        Function returns the csv format data
        """
        return pd.read_csv(self.csv_path[csv_name])


## CONSTRUCT DATABASE CLASS

We created a distinct class to handle every aspect of PostgreSQL. This is useful for future use as well whenever we need to play around with PostgreSQL database.

In [None]:
import pandas as pd
import psycopg2
from psycopg2 import Error
import numpy as np

class PostgreSQL:
    """
    This class is used to handle everything relating to PostgreSQL database ranging from 
    creating table, inserting data, to reading data
    """
    
    def __init__(self, user, password, host, db_name, port):
        """
        Constructor creates the PostgreSQL object and connects to PostgreSQL database
        It requires parameters relating to the information of the database we would like 
        to connect to
        """
        
        self.user=user
        self.password=password
        self.host=host
        self.port=port
        self.db_name=db_name
        try:
            # Connect to the database of interest 
            self.conn= psycopg2.connect(user=self.user,
                                   password=self.password,
                                   host=self.host,
                                   port=self.port,
                                   database=self.db_name
                                   )
            
            # Call a cursor to work with database
            self.cursor= self.conn.cursor()
            
            print('PostgreSQL has been successfully connected')
        except (Exception, Error) as error:
            print('Failed to connect to PostgreSQL database ', error)
        
    def create_table_in_db(self, table_name,dict_col_types):
        """
        Function is used to create a new table in database
        Function requires 2 parameters:
        + table_name which is the name of the table we want to create
        + dict_col_types which is a dictionary whose keys are column names and values are
        the  data types they store
        """
        
        try:
            # Convert the column names and data types to separate lists
            list_keys= [i for i in dict_col_types.keys()]
            list_values= [i for i in dict_col_types.values()]
            
            # Prepare syntax that is understandable for PostgreSQL 
                
                # Drop table if the table having the same name exists before
            drop_query= 'DROP TABLE IF EXISTS '+table_name
                
                # Create table
            create_query= 'CREATE TABLE '+ table_name+'('
            for idx, key in enumerate(list_keys):
                threshold= len(list_keys)-1
                if idx != threshold:
                    create_query=create_query+key +'    '+ list_values[idx]+',\n'
                else:    
                    create_query=create_query+key +'    '+ list_values[idx]+');'
            
            # Execute the queries and save 
            self.cursor.execute(drop_query) 
            self.cursor.execute(create_query)
            self.conn.commit()
            
            print('Table created successfully in PostgreSQL')
        except(Exception, Error) as error:
            print('Error while connecting to PostgreSQL', error)
        
        
    def load_data_to_db(self,table_name, list_of_col_names, obs):
        """
        Function is used to load the data to database
        Function requires 3 parameters:
        + table_name is the name of the table that we would like to load the data to
        + list_of_col_names is the list of the name of columns of that table
        + obs accepts either dataframe of observations or the array of observations 
        """
        try:
            # Prepare syntax that is understandable to PostgreSQL database
            partial_query='INSERT INTO  '+table_name+'('
            
            threshold= len(list_of_col_names)-1
            for idx, j in enumerate(list_of_col_names):
                if idx!= threshold:
                    partial_query=partial_query+' '+j+','
                else:
                    full_query=partial_query+' '+j+') VALUES(' 
           
            for idx in np.arange(len(list_of_col_names)):
                if idx!= threshold:
                    full_query+='%s,'
                else:
                    full_query+='%s)'
            
            # Identify and convert each observation to list object 
            if isinstance(obs,pd.DataFrame):
                list_load= obs.values
            else:
                list_load= obs
            
            # Execute the queries and save
            self.cursor.executemany(full_query, list_load) 
            self.conn.commit()
            
            print('Rows inserted successfully')
        except(Exception, Error) as error:
            print('Error while connecting to PostgreSQL', error)
    
    def read_data_from_db(self,table_name, col_name='all'):
        """
            Function is used to read the data from database
            Function requires two parameters:
            + table_name is the name of the table that we would like to extract data
            + col_name is a list of column names of that table
            Function returns data from the table and columns of interest
        """
        try:
            if col_name=='all':
                read_query= '''SELECT * FROM '''+ table_name
            else:
                read_query= 'SELECT '
                for idx, i in enumerate(col_name):
                    threshold=len(col_name)-1 
                    if idx!= threshold:
                        read_query+=i+','
                    else:
                        read_query+=i+'    FROM '+table_name
            self.cursor.execute(read_query)
            rows= self.cursor.fetchall()
            for row in rows:
                print(row)
            print('Read successfully')
        except(Exception, Error) as error:
            print('Error while fetching data from the PostgreSQL database',error)
            
    
    def close_connection(self):
        """
        Function is used to close the connection to the database 
        """
        if self.conn:
            self.cursor.close()
            self.conn.close()
            print('PostgreSQL connection is closed')

## CONSTRUCT TRANSFORMATION AND LOAD CLASS

And this class is used to handle the data transformation as well the data insertion to database

In [19]:
from DataSources import Extract
from Databases import PostgreSQL
import pandas as pd
import numpy as np

class Transformation_Load:
    """
    This class handles the transfomation as well as loading the transformed data to the 
    PostgreSQL database
    """
    
    def __init__(self, dataSource, dataSet):
        """
        Constructor creates a Transformation_Load object (the data needed to be transformed 
        and the newly created connection to database are assigned to instance variables: 
        i.e: self.data, and self.psql)
        """
        # Create Extract object
        extractObj = Extract()
    
        # Fetch the data needed to be transformed
        if dataSource == 'api':
            self.data= extractObj.getAPIsdata(dataSet)
        elif dataSource == 'csv':
            self.data= extractObj.getCSVdata(dataSet)
        else:
            print('Unknown Data format! Please try again')
        
        # Create connection to the database of interest
        cred= extractObj.data_sources['data_sources']['PostgreSQL_cred']
        user= cred['user']
        password= cred['password']
        host= cred['host']
        db_name= cred['db_name']
        port= cred['port']
        self.psql = PostgreSQL( user, password, host, db_name, port)
        
        
    #Covid Deaths data generic structure
    def csv_CovidDeaths(self, table_name):
        """
        Function is used to do all transformation relating to CovidDeaths data(i.e: 
        Selecting rows that have non-null values in the continent column; Total number of 
        people tested positive for COVID by country
        Function requires 1 parameter which is table_name representing the name of the table we would like to insert data into
        """

        # Drop rows having no values in continent column
        deaths_no_nul_cont= self.data[self.data['continent'].notnull()]

        # Compute total cases by country
        full2=deaths_no_nul_cont.groupby(['continent','location'])['new_cases']\
                                .agg('sum')\
                                .reset_index()\
                                .rename({'new_cases':'total_cases'},axis=1)

        # Load the transformed data to the PostgreSQL database of interest
        dict_col_types_covid={'continent':'varchar(30)',
                              'location':'varchar(50)','total_cases':'bigint'}
        list_of_col_names_covid= [i for i in full2.columns]
        self.psql.create_table_in_db(table_name, dict_col_types_covid)
        self.psql.load_data_to_db(table_name, list_of_col_names_covid,full2)
        self.psql.close_connection()
        
        
        
    #Air Pollution data generic structure
    def api_Pollution(self,  table_name):
        """
        Function is used to do all transformation relating to pollution data(i.e: 
        Selecting only relevant columns to our interest)
        Function requires 1 parameter which is table_name representing the name of the table we would like to insert data into
        """
        # Extract only the relevant columns
        air_list=[]
        for idx, ele in  enumerate (self.data['results']):
            for mea in ele['measurements']:
                air_dict={}
                air_dict['city']=ele['city']
                air_dict['coordinates']=ele['coordinates']
                air_dict['country']=ele['country']
                air_dict['parameter']=mea['parameter']
                air_dict['value']=mea['value']
                air_dict['unit']=mea['unit']
            air_list.append(air_dict)

        coor_nested = pd.DataFrame(air_list)
        full=pd.concat((coor_nested,coor_nested['coordinates'].apply(pd.Series)),axis=1)\
               .drop('coordinates',axis=1)

        # Load the transformed data to the PostgreSQL database of interest
        dict_col_types= {'city': 'varchar(50)', 'country': 'text',
                         'parameter': 'text', 'value': 'real', 'unit':'text', 
                         'latitude':'real','longitude':'real'}
        list_of_col_names= [i for i in full.columns]
        self.psql.create_table_in_db( table_name,dict_col_types)
        self.psql.load_data_to_db( table_name,list_of_col_names,full)
        self.psql.close_connection()

## HOW EVERYTHING WORKS TOGETHER

In [21]:
# Import modules 
from DataSources import Extract
from Databases import PostgreSQL
from DataTransformationLoad import Transformation_Load

__Let's firstly fetch the data and have a brief look at its original structure__

In [23]:
# Create Extract object
ex= Extract()
# Fetch pollution data using API
ex.getAPIsdata('Pollution')

{'meta': {'name': 'openaq-api',
  'license': 'CC BY 4.0d',
  'website': 'https://u50g7n0cbj.execute-api.us-east-1.amazonaws.com/',
  'page': 1,
  'limit': 500,
  'found': 56},
 'results': [{'location': 'FI00606',
   'city': 'South Karelia',
   'country': 'FI',
   'coordinates': {'latitude': 61.0557599995458, 'longitude': 28.18431},
   'measurements': [{'parameter': 'pm25',
     'value': 3,
     'lastUpdated': '2016-12-21T10:00:00Z',
     'unit': 'µg/m³',
     'sourceName': 'EEA Finland',
     'averagingPeriod': {'value': 3600, 'unit': 'seconds'}},
    {'parameter': 'pm10',
     'value': 3.4624311,
     'lastUpdated': '2022-03-03T17:00:00Z',
     'unit': 'µg/m³',
     'sourceName': 'EEA Finland',
     'averagingPeriod': {'value': 3600, 'unit': 'seconds'}},
    {'parameter': 'no2',
     'value': 6.6023043,
     'lastUpdated': '2022-03-03T17:00:00Z',
     'unit': 'µg/m³',
     'sourceName': 'EEA Finland',
     'averagingPeriod': {'value': 3600, 'unit': 'seconds'}}]},
  {'location': 'FI004

It worked. We could tell that the data has json format

In [25]:
# Fetch covid data from local computer
ex.getCSVdata('CovidDeaths')

Unnamed: 0,iso_code,continent,location,date,population,total_cases,new_cases,new_cases_smoothed,total_deaths,new_deaths,...,new_deaths_smoothed_per_million,reproduction_rate,icu_patients,icu_patients_per_million,hosp_patients,hosp_patients_per_million,weekly_icu_admissions,weekly_icu_admissions_per_million,weekly_hosp_admissions,weekly_hosp_admissions_per_million
0,AFG,Asia,Afghanistan,24/02/2020,39835428.0,5.0,5.0,,,,...,,,,,,,,,,
1,AFG,Asia,Afghanistan,25/02/2020,39835428.0,5.0,0.0,,,,...,,,,,,,,,,
2,AFG,Asia,Afghanistan,26/02/2020,39835428.0,5.0,0.0,,,,...,,,,,,,,,,
3,AFG,Asia,Afghanistan,27/02/2020,39835428.0,5.0,0.0,,,,...,,,,,,,,,,
4,AFG,Asia,Afghanistan,28/02/2020,39835428.0,5.0,0.0,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
150624,ZWE,Africa,Zimbabwe,24/12/2021,15092171.0,202736.0,1392.0,1881.286,4871.0,16.0,...,0.871,0.88,,,,,,,,
150625,ZWE,Africa,Zimbabwe,25/12/2021,15092171.0,203746.0,1010.0,2025.571,4885.0,14.0,...,1.003,0.86,,,,,,,,
150626,ZWE,Africa,Zimbabwe,26/12/2021,15092171.0,204351.0,605.0,1811.143,4891.0,6.0,...,1.032,0.86,,,,,,,,
150627,ZWE,Africa,Zimbabwe,27/12/2021,15092171.0,205449.0,1098.0,1481.429,4908.0,17.0,...,0.975,,,,,,,,,


It worked as well. The initial structure of this dataframe is around 150000 rows and 26 columns

__Secondly, let's try to apply transformation and load the data to the database by calling on class Transfomration_Load__

In [116]:
# Keep track of how many files having the same format as well as use this number as the suffixes to distinguish among files having the same format
countAPI=0
countCSV=0

# loop through all file defined in data_config file
for (sources, names) in json.load(open('data_config.json'))['data_sources'].items():
    if sources!='PostgreSQL_cred':
        name_used= [i for i in names][0]
        tl=Transformation_Load(sources,name_used)
        
        if sources=='api':
            tl.api_Pollution(name_used+str(countAPI))
            countAPI+=1
        
        elif sources=='csv':
            tl.csv_CovidDeaths(name_used+str(countCSV))
            countCSV+=1

PostgreSQL has been successfully connected
Table created successfully in PostgreSQL
Rows inserted successfully
PostgreSQL connection is closed
PostgreSQL has been successfully connected
Table created successfully in PostgreSQL
Rows inserted successfully
PostgreSQL connection is closed


__Last but not least, let's run a check to see if the data has been transformed and inserted into the database__

In [74]:
# Let's use the information in the config file about our database to connect to it
user='postgres'
password='*************' 
host='localhost'
port='5432'
db_name='PortfolioProject'

In [117]:
psql= PostgreSQL(user=user, password=password,host=host,port=port,db_name=db_name)

PostgreSQL has been successfully connected


In [118]:
# Let's see if data  have been inserted into database or not
for (sources, names) in json.load(open('data_config.json'))['data_sources'].items():
    if sources!='PostgreSQL_cred':
        name_used= [i for i in names][0]
        if sources=='api':
            for idx in np.arange(0,countAPI):
                psql.read_data_from_db(name_used+str(idx))
                print('End of files fetched by API')
        elif sources=='csv':
            for idx in np.arange(0,countCSV):
                psql.read_data_from_db(name_used+str(idx))
                print('END of CSV files')
psql.close_connection()       

('Africa', 'Algeria', 217265)
('Africa', 'Angola', 76787)
('Africa', 'Benin', 24935)
('Africa', 'Botswana', 212482)
('Africa', 'Burkina Faso', 17632)
('Africa', 'Burundi', 26999)
('Africa', 'Cameroon', 109367)
('Africa', 'Cape Verde', 39345)
('Africa', 'Central African Republic', 12163)
('Africa', 'Chad', 5703)
('Africa', 'Comoros', 5867)
('Africa', 'Congo', 20089)
('Africa', "Cote d'Ivoire", 65066)
('Africa', 'Democratic Republic of Congo', 72349)
('Africa', 'Djibouti', 13603)
('Africa', 'Egypt', 383003)
('Africa', 'Equatorial Guinea', 13637)
('Africa', 'Eritrea', 7931)
('Africa', 'Eswatini', 64873)
('Africa', 'Ethiopia', 405745)
('Africa', 'Gabon', 39910)
('Africa', 'Gambia', 10136)
('Africa', 'Ghana', 140221)
('Africa', 'Guinea', 31238)
('Africa', 'Guinea-Bissau', 6474)
('Africa', 'Kenya', 285654)
('Africa', 'Lesotho', 28126)
('Africa', 'Liberia', 6228)
('Africa', 'Libya', 386878)
('Africa', 'Madagascar', 49590)
('Africa', 'Malawi', 72135)
('Africa', 'Mali', 20287)
('Africa', 'Mauri

Data of all formats have successfully been trasnformed and inserted to database(e.g: Original Covid Data has around 150000 rows and 26 columns. After being transformed, there are only 3 columns: continent, location, total number of people tested positive for COVID. Each row represents each country)