# Bank International Settlement - Data Pipeline
__Team 6 - Lane Whitmore and Dave Friesen__<br>
__ADS-507-02-SP23__<br><br>
__GitHub link: https://github.com/lanewhitmore/BIS_Data_Pipeline__

In [1]:
__authors__ = ['Lane Whitmore', 'Dave Friesen']
__contact__ = ['lwhitmore@sandiego.edu', 'dfriesen@sandiego.edu']
__date__ = '2023-02-04'
__license__ = 'MIT'
__version__ = '1.0.0'

# Setup

In [7]:
# Import data and pipeline libraries
import pandas as pd
import pymysql as mysql
import MySQLdb
# Import utility libraries
from urllib.request import urlretrieve
import zipfile
import logging
import os
from datetime import datetime

ModuleNotFoundError: No module named 'MySQLdb'

# Data Extract and Load

In [2]:
# This function gets a simple file row count for data load confirmation
def ctrl_count(fname):
    f = open(fname)
    count = sum(1 for line in f)
    f.close()
    return count

In [3]:
# Define file location
url = 'https://www.bis.org/statistics/'

# Define dataset details
ds = {'exr': {'efname': 'full_xru_csv.zip',  # Name of file at above url
              'lfname': 'WS_XRU_csv_col.csv',  # Name of unzipped file
              'id_vars': range(0, 16)},  # ID (vs. value) columns
      'cp': {'efname': 'full_long_cpi_csv.zip',
             'lfname': 'WS_LONG_CPI_csv_col.csv',
             'id_vars': range(0, 14)},
      'pr': {'efname': 'full_cbpol_m_csv.zip',
             'lfname': 'WS_CBPOL_M_csv_col.csv',
             'id_vars': range(0, 13)}}

# Establish dataframe dictionaries - these are accessible by dataset abbreviation and
#   this allows for consistent code with flexibility
df = {'exr': None, 'cp': None, 'pr': None}  # To be used for full dataframe
df_ids = {'exr': None, 'cp': None, 'pr': None}  # To be used for subset of IDs only
df_values = {'exr': None, 'cp': None, 'pr': None}  # To be used for subset of values only

# Define download (extract) and load location - In this example, it assumes that the
#   default (code) directory is 'src', and that a 'data' directory exists in parallel;
#   this can be configured to anything
ds_path = '../data/'

# Setup logging
logging.basicConfig(filename=ds_path+'pipeline.log', filemode='w', force=True,
                    format='%(asctime)s:%(levelname)s:%(message)s')

# Iterate dataset
for i in ds:
    # Show we're doing something
    print('Processing '+ds[i]['efname'], end='... ')

    # Download file; if error, log and skip to next file in dataset
    try:
        urlretrieve(url + ds[i]['efname'], ds_path+ds[i]['efname'])
        logging.info(ds[i]['efname']+' retrieved')
    except urllib.error.HTTPError as e:
        logging.error('retrieve_HTTP error_'+str(e.code)+'_'+e.reason)
        continue
    except urllib.error.HTTPException as e:
        logging.error('retrieve_HTTP exception_'+str(e.code)+'_'+e.reason)
        continue
    except urllib.error.URLError as e:
        logging.error('retrieve_URL error_'+str(e.code)+'_'+e.reason)
        continue

    # Unzip file; if error, log and skip to next file in dataset
    try:
        with zipfile.ZipFile(ds_path+ds[i]['efname'], 'r') as zip:
            zip.extractall(ds_path)
        logging.info(ds[i]['efname']+' unzipped')
    except FileNotFoundError:
        logging.error('unzip_File not found')
        continue
    except zipfile.BadZipFile:
        logging.error('unzip_Bad zip file')
        continue
    except zipfile.LargeZipFile:
        logging.error('unzip_Large zip file')
        continue
    
    # Get control count (row count from raw CSV)
    ctrl = ctrl_count(ds_path+ds[i]['lfname'])
    
    # Load dataframe
    df[i] = pd.read_csv(ds_path+ds[i]['lfname'], on_bad_lines='skip', low_memory=False)
    
    # Confirm control counts
    logging.info('{0}_file={1}_import={2}_delta={3}'. \
                 format(ds[i]['lfname'], ctrl, len(df[i]), ctrl-len(df[i])))
    
    # If control count is off by more than one row (i.e., assuming header),
    #   log as warning
    if (ctrl - len(df[i])) > 1:
        logging.warning('control total exception')
        
    # The following subsets the full dataframe into two parts: one each for IDs and values
    #   This is so that the pivoted values don't repeat all of the IDs unecessarily
    #   Both subsets retain the original index so can be joined that way in further
    #     processing
    df_ids[i] = df[i].iloc[:, ds[i]['id_vars']]
    df_values[i] = df[i].iloc[:, max(ds[i]['id_vars'])+1:]
    df_values[i] = pd.melt(df_values[i], value_vars = df_values[i].iloc[:, 0:])

    print()

Processing full_xru_csv.zip... 
Processing full_long_cpi_csv.zip... 
Processing full_cbpol_m_csv.zip... 


# Data Transformation

In [8]:
### Creating connection to local machine // formulas to nest df to 
def run_connection(db_connection: mysql, syntax: str) -> None:
    """
    Run Syntax.

    :param db_connection: DB connection object. 
    :param syntax: Syntax for database connection execution.
    """
    cur = db_connection.cursor()
    cur.execute(syntax)
    cur.close()

def create_table(schema: str, table: str) -> None:
    """
    Create a new table in the connected database on the schema

    :param schema: The schema for the table.
    :param table: The name of the table within the schema.
    """
    db_conn = mysql.connect(
        host=os.environ.get("HOST"), 
        port=int(os.environ.get("PORT")), 
        user=os.environ.get("USER"), 
        passwd=os.environ.get("PASSWORD"), 
        db=os.environ.get("DB_NAME"),
    )
    
    run_connection(db_connection=db_conn, syntax=f"CREATE TABLE IF NOT EXISTS {table}({schema})")

    db_conn.commit()
    db_conn.close()

def populate_table(table_name: str, df: pd.DataFrame) -> None:
    """ 
    Insert df into table within database

    :param table_name: Name of the table within the DB
    :param df: df to be inserted into schema/DB
    """
    db_conn = mysql.connect(
        host=os.environ.get("HOST"), 
        port=int(os.environ.get("PORT")), 
        user=os.environ.get("USER"), 
        passwd=os.environ.get("PASSWORD"), 
        db=os.environ.get("DB_NAME"),
    )

    cur = db_conn.cursor()
    cur.execute(f"SELECT * FROM {table_name} LIMIT 0")
    cur.close()

    col_names = [i[0] for i in cur.description]
    df["upload_timestamp"] = [datetime.now().strftime("%m-%d-%Y %H:%M:%S")] * len(df.index)

    missing_columns = set(col_names).difference(df.columns)
    assert not missing_columns, f"The columns listed are missing from your dataset: {','.join(missing_columns)}"

    df = df[col_names]

    for index, row in df.iterrows():
        run_connection(db_connection=db_conn, syntax=f"INSERT INTO {table_name} VALUES{tuple(row.values)}")
    db_conn.commit()
    db_conn.close()
    


In [9]:
populate_table(table_name = "exchange_rate_values", df = df_values['exr'])
populate_table(table_name = "policy_rate_values", df = df_values['pr'])
populate_table(table_name = "customer_prices_values", df = df_values['cp'])

populate_table(table_name = "exchange_rate_id", df = df_ids['exr'].iloc[:,[1,3,5,7,9,10,13,14,15,16]])
populate_table(table_name = "policy_rate_id", df = df_ids['pr'].iloc[:,[1,3,6,7,9,10,11,12,13]])
populate_table(table_name = "customer_prices_id", df = df_ids['cp'].iloc[:,[1,3,5,13,14]])

AssertionError: The columns listed are missing from your dataset: exr_val_id

# Data Consumption

In [6]:
#