## **CLOUD DATA ENGINEERING FIRST PROJECT**

### **Basic extract, transform, and load (ETL) pipeline using web scraping, pandas and sql.**

#### Import necessary libraries

In [5]:
%pip install requests beautifulsoup4 pandas

Note: you may need to restart the kernel to use updated packages.


In [43]:
from io import StringIO
import requests
from bs4 import BeautifulSoup
import pandas as pd
import sqlite3
from datetime import datetime


### Step:1 Maintaining a Log File

This step is done to record the logs while performing ETL and it is not necessary in an ETL pipeline.

In [44]:
def log_process(message):
    """
    This function logs the mentioned message of a given stage of the
    code execution to a log file. Function returns nothing

    """
    with open('./logs/code_log.txt', 'a') as f:
        f.write(f'{datetime.now()}: {message}\n')

### Step 2:  Extract

In [45]:
def extract(url, table_attribute):
    """ 
    This function aims to extract the required
    information from the website and save it to a data frame. The
    function returns the data frame for further processing. 
    
    """

    soup = BeautifulSoup(requests.get(url).text, "html.parser")
    table = soup.find('span',string=table_attribute).find_next('table')
    df = pd.read_html(StringIO(str(table)))[0]

    log_process('Data extraction complete, Initiating transform process')
    
    return df

### Step 3: Transformation

In [46]:
def transform(df, csv_path):
    """ 
    This function accesses the CSV file for exchange rate
    information, and adds three columns to the data frame, each
    containing the transformed version of Market Cap column to
    respective currencies
    
    """
    exchange_rate = pd.read_csv(csv_path, index_col=0).to_dict()['Rate']
    print(exchange_rate)
    df['MC_EUR_Billion'] = round(df['Market cap (US$ billion)'] * exchange_rate['EUR'], 2)
    df['MC_GBP_Billion'] = round(df['Market cap (US$ billion)'] * exchange_rate['GBP'], 2)
    df['MC_INR_Billion'] = round(df['Market cap (US$ billion)'] * exchange_rate['INR'], 2)

    print(df)

    log_process('Data transformation complete. Initiating load process')

    return df

### Step 4: Load

Loading data to csv

In [47]:
def load_csv(df, output_path):
    """ 
    This function saves the final data frame as a CSV file in
    the provided path. Function returns nothing.
    
    """
    df.to_csv(output_path)
    log_process('Data saved to csv.')

### Step 5: Load 

Loading data to SQL

In [48]:
def load_db(df, sql_connection, table_name):
    """ 
    This function saves the final data frame to a database
    table with the provided name. Function returns nothing.
    
    """
    df.to_sql(table_name, sql_connection, if_exists='replace', index= False)
    log_process('Data loaded to database as a table, Executing queries')

### Run Queries

In [51]:
def run_query(query_statement, sql_connection):
    """ 
    This function runs the query on the database table and
    prints the output on the terminal. Function returns nothing. 
    
    """

    cursor = sql_connection.cursor()
    cursor.execute(query_statement)
    result = cursor.fetchall()

    log_process('Process Complete')

    return result

### Executing Pipeline

In [52]:
if __name__ == '__main__':

    # Variables as argument for step 2 Extraction content function(extract)
    url = 'https://web.archive.org/web/20230908091635/https://en.wikipedia.org/wiki/List_of_largest_banks'
    table_attribute = 'By market capitalization'

    df = extract(url, table_attribute)

    # Variable as argument for step 3 Transformation content function(transform)
    csv_path = './input/exchange_rate.csv'

    transform(df, csv_path)

    # Variable as argument for step 4 Loading data to csv function(load_csv)
    output_path = './output/Largest_banks_data.csv'

    load_csv(df, output_path)

    # Variable as argument for step 5 Loading data to database function(load_db)
    database_name = './output/Banks.db'
    table_name = 'Largest_banks'

    with sqlite3.connect(database_name) as conn:
        load_db(df, conn, table_name)

        print(run_query('SELECT * FROM Largest_banks', conn))

        print(run_query('SELECT AVG("MC_INR_Billion") FROM Largest_banks', conn))

        print(run_query('SELECT "Bank name" FROM Largest_banks LIMIT 5', conn))

{'EUR': 0.97, 'GBP': 0.8, 'INR': 85.0}
   Rank                                Bank name  Market cap (US$ billion)  \
0     1                           JPMorgan Chase                    432.92   
1     2                          Bank of America                    231.52   
2     3  Industrial and Commercial Bank of China                    194.56   
3     4               Agricultural Bank of China                    160.68   
4     5                                HDFC Bank                    157.91   
5     6                              Wells Fargo                    155.87   
6     7                        HSBC Holdings PLC                    148.90   
7     8                           Morgan Stanley                    140.83   
8     9                  China Construction Bank                    139.82   
9    10                            Bank of China                    136.81   

   MC_EUR_Billion  MC_GBP_Billion  MC_INR_Billion  
0          419.93          346.34        36798.20 