In [None]:
# !pip install SQLAlchemy
# !pip install psycopg2
# !pip install pandas
# !pip install PyYAML
# !pip install tabula-py
# !pip install requests
# !pip install boto3

In [1]:
from sqlalchemy import create_engine, MetaData, Table, select
from dateutil.parser import parse
import pandas as pd
import numpy as np
import psycopg2
import yaml
import re
import boto3
import tabula

# using the requests library to GET the number of stores
import requests

## Utility Functions

In [2]:
def convert_weights(weight):
    """
    @desc: check if the input matches the numeric and alphabet matching expression
    """
    match = re.match(r"([\d.]+)([a-zA-Z]+)", weight)
    # if there is match then set the first output as value (numeric) and second as unit (g, kg, ml or l)
    if match:
        value, unit = match.groups()
        # conver the numeric value to floating pt
        value = float(value)
        # check for the cases of 'g', 'ml' and 'l'
        if unit == 'g':
            value /= 1000
            unit = 'kg'
        elif unit == 'ml':
            value /= 1000
            unit = 'kg'
        elif unit == 'l':
            unit = 'kg'
        elif unit == 'oz':
            value *= 0.0283495
            unit = 'kg'
        # force the output to be 3 d.p
        return f'{value:.3f}{unit}'
    else:
        return weight
    

def mullexp_to_netresult(in_exp):
    if 'x' in in_exp:
        match = re.match(r'(\d+)\s*x\s*(\d+)([a-zA-Z]+)', in_exp)
        if match:
            multiplier = int(match.group(1))
            value = int(match.group(2))
            unit = match.group(3)
            # Perform the multiplication
            result = multiplier * value
            # Append the result with the unit
            return str(result) + unit
    else:
        return in_exp
    
def is_alpha(in_str):
    """
    @desc: function to check if the column has alphanumeric entries
    """
    return any(c.isalpha() for c in in_str)

def is_alphanumeric(in_str):
    """
    @desc: function to check if the column has alphanumeric entries
    """
    return bool(re.fullmatch(r'^[a-zA-Z0-9_]*$', in_str))

def has_yyyy_mm_dd_format(in_str):
    """
    @desc: function to decide if the a column of a data has date format yyyy-mm-dd
    """
    return bool(re.fullmatch(r'\d{4}-\d{2}-\d{2}', in_str))

def convert_date_to_yyyy_mm_dd(in_column : pd.core.series.Series):
    """
    @desc: function to set the date column with date format yyyy-mm-dd
    """
    in_column = in_column.apply(parse)
    in_column = pd.to_datetime(in_column, infer_datetime_format=True, errors='coerce')
    
    return in_column

# Milestone 2

## Task 3 -- Users Data

In [None]:
# Test code for Psycopg2
# conn = psycopg2.connect(
#     host=ENDPOINT,
#     port=PORT,
#     database=DATABASE,
#     user=USER,
#     password=PASSWORD
# )
yaml_file_path = '../db_creds.yaml'

# Read the YAML file and store its contents in a Python data structure (dictionary)
with open(yaml_file_path, 'r') as file:
    yaml_data = yaml.safe_load(file)

# credentials
DATABASE_TYPE = 'postgresql'
# DBAPI = 'psycopg2'
ENDPOINT = yaml_data['RDS_HOST']
USER = yaml_data['RDS_USER']
PASSWORD = yaml_data['RDS_PASSWORD']
PORT = yaml_data['RDS_PORT']
DATABASE = yaml_data['RDS_DATABASE']

# setup sql engine and connect
sql_engine = create_engine(f"{DATABASE_TYPE}://{USER}:{PASSWORD}@{ENDPOINT}:{PORT}/{DATABASE}")
sql_connection = sql_engine.connect()

# metadata, holds collection of table info, their data types, schema names etc., obtained from here : https://docs.sqlalchemy.org/en/20/core/metadata.html
# pros of MetaData() includes thread safety -> meaning it can handle concurrent tasks from multiple thread (computationally efficient when multiple threads need access to same resource)
metadata = MetaData()
metadata.reflect(sql_engine)
table_names = metadata.tables.keys()

# reflect allows us
names_ = list(table_names)

# read table
users_table = sql_connection.execute(select(Table('legacy_users', metadata, autoload=True, autoload_with=sql_engine)))
headers = users_table.keys()
users_df = pd.DataFrame(users_table.fetchall(), columns=headers)

In [None]:
table_name = 'legacy_users'

In [None]:
# check for nulls or NaNs, alternative is np.unique(users_df.isnull()), or just use df.info()
if users_df.isnull().sum().sum() and users_df.isna().sum().sum():
    raise f"The database : {table_name}, has total {users_df.isnull().sum().sum()} NULL values and {users_df.isna().sum().sum()} NaN values"
else:
    print(f"[usrmsg] No NULLs or NaNs found in {table_name}")

users_df_processed = users_df[~users_df.apply(lambda row: row.astype(str).str.contains('NULL').any(), axis=1)]

# check for data types 
#   -1) always begin with dropping duplicates and storing as a seperate file
users_df_processed = users_df_processed.drop_duplicates()
#   -2) set all columns except index to be of string format
str_convert_dict = {col: 'string' for col in users_df_processed.columns if col not in ['index']}
users_df_processed = users_df_processed.astype(str_convert_dict)
#   -3) remove all entries that are pure alphanumeric
users_df_processed = users_df_processed[~users_df_processed['email_address'].apply(is_alphanumeric)]
#   -4) DoB and join_date should be datetime format and of type yyyy-mm-dd
users_df_processed['date_of_birth'] = convert_date_to_yyyy_mm_dd(users_df_processed['date_of_birth'])
users_df_processed['join_date'] = convert_date_to_yyyy_mm_dd(users_df_processed['join_date'])
#   -5) convert all 'GGB' country code to 'GB'
users_df_processed['country_code'] = users_df_processed['country_code'].str.replace('GGB', 'GB', regex=False)


In [None]:
users_df_processed.head(5)

In [None]:
yaml_file_path = '../local_db_creds.yaml'

# Read the YAML file and store its contents ain a Python data structure (dictionary)
with open(yaml_file_path, 'r') as file:
    yaml_data = yaml.safe_load(file)

# credentials
DATABASE_TYPE = 'postgresql+psycopg2'
ENDPOINT = yaml_data['LOCAL_HOST']
USER = yaml_data['LOCAL_USER']
PASSWORD = yaml_data['LOCAL_PASSWORD']
PORT = yaml_data['LOCAL_PORT']
DATABASE = yaml_data['LOCAL_DATABASE']

engine = create_engine(f"{DATABASE_TYPE}://{USER}:{PASSWORD}@{ENDPOINT}:{PORT}/{DATABASE}")
connection = engine.connect()

In [None]:
users_df_processed.to_sql(name="dim_users", con=engine, if_exists='replace', index=False, schema='public')

## Task 4 -- Card Data

In [None]:
# read the .pdf file
pdf_path = "../card_details.pdf"
card_df_list = tabula.read_pdf(pdf_path, stream=True, pages='all')
card_df = pd.concat(card_df_list, ignore_index=True)

In [None]:
#   -1) drop columns that are filled with missing and/or incorrect information
card_processed_df = card_df.drop(columns=['Unnamed: 0'])
 #  -2) always begin with dropping duplicates 
card_processed_df = card_processed_df.drop_duplicates()
#   -3) remove columns with "NULL"
card_processed_df = card_processed_df[~card_processed_df.apply(lambda row: row.astype(str).str.contains('NULL').any(), axis=1)]
#   -4) remove all entries that are pure alphanumeric
card_processed_df = card_processed_df[~card_processed_df['card_provider'].apply(is_alphanumeric)]
#   -5) those rows that have NaN in card_number expiry_date, fill those appropriately
nan_card_num_expiry_date_df = card_processed_df[card_processed_df['card_number expiry_date'].isna()]
nan_card_num_expiry_date_df['card_number expiry_date'] = nan_card_num_expiry_date_df['card_number'].astype(str) + ' ' + nan_card_num_expiry_date_df['expiry_date'].astype(str)
#   -6) those rows that DONT have NaN in card_number expiry_date column, strip those isolated and replace their equivalent NaN values in the card_number and the expiry_date columns appropriately
not_nan_card_num_expiry_date_df = card_processed_df[~card_processed_df['card_number expiry_date'].isna()]
splitted_cardnumexpdate_df = not_nan_card_num_expiry_date_df['card_number expiry_date'].str.split(n=1, expand=True)
not_nan_card_num_expiry_date_df['card_number'], not_nan_card_num_expiry_date_df['expiry_date'] = splitted_cardnumexpdate_df[0], splitted_cardnumexpdate_df[1]
#   -7) combine the two to store the seperate data with no NaNs . . . (hopefully :P)
card_processed_df = pd.concat([nan_card_num_expiry_date_df, not_nan_card_num_expiry_date_df], ignore_index=True)
del nan_card_num_expiry_date_df, not_nan_card_num_expiry_date_df
#   -8) change all objects to string
card_processed_df = card_processed_df.astype('string')
#   -9) finally, change all date columns to datetimeformat
card_processed_df['date_payment_confirmed'] = convert_date_to_yyyy_mm_dd(card_processed_df['date_payment_confirmed'])
card_processed_df['expiry_date'] = pd.to_datetime(card_processed_df['expiry_date'], format='%m/%y') + pd.offsets.MonthEnd(0)

In [None]:
yaml_file_path = '../local_db_creds.yaml'

# Read the YAML file and store its contents in a Python data structure (dictionary)
with open(yaml_file_path, 'r') as file:
    yaml_data = yaml.safe_load(file)

# credentials
DATABASE_TYPE = 'postgresql+psycopg2'
ENDPOINT = yaml_data['LOCAL_HOST']
USER = yaml_data['LOCAL_USER']
PASSWORD = yaml_data['LOCAL_PASSWORD']
PORT = yaml_data['LOCAL_PORT']
DATABASE = yaml_data['LOCAL_DATABASE']

engine = create_engine(f"{DATABASE_TYPE}://{USER}:{PASSWORD}@{ENDPOINT}:{PORT}/{DATABASE}")
connection = engine.connect()
card_processed_df.to_sql("dim_card_details", engine, if_exists='replace', index=False)

## Task 5 -- Store Related Data

In [None]:
yaml_file_path = '../api_key.yaml'

# Read the YAML file and store its contents in a Python data structure (dictionary)
with open(yaml_file_path, 'r') as file:
    yaml_data = yaml.safe_load(file)


# Retrieve a store: https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/store_details/{store_number}
# Return the number of stores: https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/number_stores

- Common methods include `GET` (retrieve resource), `POST` (create resource), `PUT` (update resource), and `DELETE` (remove resource). We will discuss in detail about them later

- **Status Codes**: HTTP status codes indicate the result of the request. These codes range from informational (`1xx`) to success (`2xx`), redirection (`3xx`), client errors (`4xx`), and server errors (`5xx`).   

- **Port**: HTTPS typically uses port 443 for communication, while HTTP uses port 80. The use of a different port helps differentiate between secure and non-secure connections.

- The request contains information such as the `HTTP method`, `the endpoint URL`, optional `headers`, and, in some cases, a `payload` or `body`.

- **HTTP Method**: The HTTP method, also known as the HTTP verb, specifies the action to be performed on the resource. Common HTTP methods include `GET` (retrieve data), `POST` (create data), `PUT` (update data), and `DELETE` (remove data).

- **Endpoint URL**: The **endpoint URL** represents the specific resource or functionality on the server that the client wants to interact with. It typically follows a specific URL pattern defined by the API.

- **Headers**: Headers provide additional information about the request, such as content type, authorization tokens, or caching directives. We will learn more about headers  in a later lesson.

- **Payload or Body**: In certain cases, requests may include a payload or body `containing data to be sent to the server`. This is common for methods like `POST` or `PUT`, where the payload contains the data to create or update a resource.

* Endpoint URLs have the usual format of  . . .
```
<ROOT_URL>/<Path>?<Query Parameters>
```

In this structure:
- `<ROOT_URL>` represents the base URL of the API
- `<Path>` refers to the specific path or endpoint within the API that offers a specific service
- `<Query Parameters>` are optional parameters passed in the URL query string, allowing for additional customization or filtering of the request

After the path section of the endpoint URL, one or more *parameters* can be specified

* The `?` symbol denotes the separation between the endpoint path and the start of the parameters
* while multiple parameters are typically separated using the `&` symbol.


In [None]:
store_number = 450
store_detail = []

get_all_stores_url = "https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/number_stores"
headers = {
    "X-API-KEY": yaml_data["API_KEY"]
}

for i in range(store_number):
    get_store_number_url = f"https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/store_details/{i}"
    response = requests.get(get_store_number_url, headers=headers)
    if response.status_code == 200:
        # Access the response data as JSON
        store_detail.append(response.json())

    else:
        print(f"Request failed with status code: {response.status_code}")
        print(f"Response Text: {response.text}")
assert len(store_detail) == 450

In [None]:
store_detail_df = pd.DataFrame(store_detail)
store_detail_df_copy = store_detail_df.copy()

In [None]:
#   -1) remove purely nan or none columns (e.g. lat)
store_detail_df_copy = store_detail_df_copy.drop(columns="lat")
store_detail_df_copy = store_detail_df_copy.drop(columns="address")
#   -2) remove all pure alphanmueric rows
no_alphanum_df = store_detail_df_copy[~store_detail_df_copy['opening_date'].apply(is_alphanumeric)]
#   -3) account for missing addresses, longitude and latitude values
# --> ANS) No need to change, it is a portal type store, and only one and unique in the table
#   -4) remove all alphabets in staff_numbers column
no_alphanum_df["staff_numbers"] = no_alphanum_df["staff_numbers"].str.replace(r'[a-zA-Z]', '', regex=True)
#   -5) fix format of opening_date
no_alphanum_df["opening_date"] = convert_date_to_yyyy_mm_dd(no_alphanum_df["opening_date"])
#   -6) set eeEurope and eeAmerica to Europe and America in the continent column
no_alphanum_df["continent"] = no_alphanum_df["continent"].str.replace('eeEurope', 'Europe')
no_alphanum_df["continent"] = no_alphanum_df["continent"].str.replace('eeAmerica', 'America')
#   -7) convert all object to string appropriately and all numbers to int and float appropriately
no_alphanum_df = no_alphanum_df.astype({col: 'string' for col in no_alphanum_df.columns if col not in ["index", "opening_date", "longitude", "staff_numbers", "latitude"]})
# no_alphanum_df = no_alphanum_df.astype({col: 'float64' for col in no_alphanum_df.columns if col in ["longitude", "latitude"]})
no_alphanum_df["longitude"] = pd.to_numeric(no_alphanum_df["longitude"], errors='coerce')
no_alphanum_df["latitude"] = pd.to_numeric(no_alphanum_df["latitude"], errors='coerce')
no_alphanum_df["staff_numbers"] = pd.to_numeric(no_alphanum_df["staff_numbers"], errors='coerce')

In [None]:
yaml_file_path = '../local_db_creds.yaml'

# Read the YAML file and store its contents in a Python data structure (dictionary)
with open(yaml_file_path, 'r') as file:
    yaml_data = yaml.safe_load(file)

# credentials
DATABASE_TYPE = 'postgresql+psycopg2'
ENDPOINT = yaml_data['LOCAL_HOST']
USER = yaml_data['LOCAL_USER']
PASSWORD = yaml_data['LOCAL_PASSWORD']
PORT = yaml_data['LOCAL_PORT']
DATABASE = yaml_data['LOCAL_DATABASE']

engine = create_engine(f"{DATABASE_TYPE}://{USER}:{PASSWORD}@{ENDPOINT}:{PORT}/{DATABASE}")
connection = engine.connect()
no_alphanum_df.to_sql("dim_store_details", engine, if_exists='replace', index=False)

## Task 6 -- Products Data

* There is a nan in `category`
* There are alphanumerics in `category`
* The first alphabets in `product_code` need to be capitalised
* nan and alphanumerics in `product_price` and `removed`

> special instructions for weight (keep last)
* there are some calculations that need to be done
* 'g' need to be converted to 'kg'
* all numbers should have consistent dp of 3dp

`course instructions`
* Convert them all to a decimal value representing their weight in kg. Use a 1:1 ratio of ml to g as a rough estimate for the rows containing ml.
* Develop the method to clean up the weight column and remove all excess characters then represent the weights as a float.

In [None]:
# s3 = boto3.client('s3')
# s3.download_file('data-handling-public', 'products.csv', '../products_data.csv')

In [None]:
products_org_df = pd.read_csv('../products_data.csv', )
products_org_copy_df = products_org_df.copy()

In [None]:
#   -1) always being by dropping duplicates
products_org_copy_df = products_org_copy_df.drop_duplicates()
#   -2) rename the Unamed column to index
products_org_copy_df.rename(columns={'Unnamed: 0': 'index'}, inplace=True)
#   -3) fill nans with empty strings to allow easier processing for the rest of the cleaning
products_org_copy_df = products_org_copy_df.fillna('')
#   -4) remove all the pure alphanumeric entries
products_org_copy_df = products_org_copy_df[~products_org_copy_df['date_added'].apply(is_alphanumeric)]
#   -5) for weights : a) compute all multiplication expressions and replace with resultant value
products_org_copy_df["weight"] = products_org_copy_df["weight"].apply(mullexp_to_netresult)
#   -6) for weights : b) standardise them to 'kg'
products_org_copy_df["weight"] = products_org_copy_df["weight"].apply(convert_weights)
#   -7) drop £ and kg to enable weight and product price columns to be numeric
products_org_copy_df["product_price"] = products_org_copy_df["product_price"].str.replace('£', '')
products_org_copy_df.rename(columns={'product_price': 'product_price (£)'}, inplace=True)
products_org_copy_df["weight"] = products_org_copy_df["weight"].str.replace('kg', '')
products_org_copy_df.rename(columns={'weight': 'weight (kg)'}, inplace=True)
#   -8) set product_price, weight, EAN to be numeric
products_org_copy_df["product_price (£)"] = pd.to_numeric(products_org_copy_df["product_price (£)"], errors='coerce')
products_org_copy_df["weight (kg)"] = pd.to_numeric(products_org_copy_df["weight (kg)"], errors='coerce')
products_org_copy_df["EAN"] = pd.to_numeric(products_org_copy_df["EAN"], errors='coerce')
#   -9) set product_name, cateogry, uuid, removed and product_code to string  AND  date_added to datetime
products_org_copy_df = products_org_copy_df.astype({"product_name" : "string", "category" : "string", "uuid" : "string", "removed" : "string", "product_code" : "string"})
products_org_copy_df['date_added'] = convert_date_to_yyyy_mm_dd(products_org_copy_df['date_added'])

In [None]:
yaml_file_path = '../local_db_creds.yaml'

# Read the YAML file and store its contents in a Python data structure (dictionary)
with open(yaml_file_path, 'r') as file:
    yaml_data = yaml.safe_load(file)

# credentials
DATABASE_TYPE = 'postgresql+psycopg2'
ENDPOINT = yaml_data['LOCAL_HOST']
USER = yaml_data['LOCAL_USER']
PASSWORD = yaml_data['LOCAL_PASSWORD']
PORT = yaml_data['LOCAL_PORT']
DATABASE = yaml_data['LOCAL_DATABASE']

engine = create_engine(f"{DATABASE_TYPE}://{USER}:{PASSWORD}@{ENDPOINT}:{PORT}/{DATABASE}")
connection = engine.connect()
products_org_copy_df.to_sql("dim_products", engine, if_exists='replace', index=False)

## Task 7 -- Orders Data

In [None]:
yaml_file_path = '../db_creds.yaml'

# Read the YAML file and store its contents in a Python data structure (dictionary)
with open(yaml_file_path, 'r') as file:
    yaml_data = yaml.safe_load(file)

# credentials
DATABASE_TYPE = 'postgresql'
# DBAPI = 'psycopg2'
ENDPOINT = yaml_data['RDS_HOST']
USER = yaml_data['RDS_USER']
PASSWORD = yaml_data['RDS_PASSWORD']
PORT = yaml_data['RDS_PORT']
DATABASE = yaml_data['RDS_DATABASE']

# setup sql engine and connect
sql_engine = create_engine(f"{DATABASE_TYPE}://{USER}:{PASSWORD}@{ENDPOINT}:{PORT}/{DATABASE}")
sql_connection = sql_engine.connect()

# metadata, holds collection of table info, their data types, schema names etc., obtained from here : https://docs.sqlalchemy.org/en/20/core/metadata.html
# pros of MetaData() includes thread safety -> meaning it can handle concurrent tasks from multiple thread (computationally efficient when multiple threads need access to same resource)
metadata = MetaData()
metadata.reflect(sql_engine)
table_names = metadata.tables.keys()

# reflect allows us
names_ = list(table_names)

In [None]:
# read table
orders_table = sql_connection.execute(select(Table('orders_table', metadata, autoload=True, autoload_with=sql_engine)))
headers = orders_table.keys()
orders_org_df = pd.DataFrame(orders_table.fetchall(), columns=headers)

In [None]:
order_processed_df = orders_org_df.copy()

In [None]:
#   -1) remove columns first_name, last_name and 1
order_processed_df = order_processed_df.drop(columns={'first_name', 'last_name', '1'})

In [None]:
yaml_file_path = '../local_db_creds.yaml'

# Read the YAML file and store its contents in a Python data structure (dictionary)
with open(yaml_file_path, 'r') as file:
    yaml_data = yaml.safe_load(file)

# credentials
DATABASE_TYPE = 'postgresql+psycopg2'
ENDPOINT = yaml_data['LOCAL_HOST']
USER = yaml_data['LOCAL_USER']
PASSWORD = yaml_data['LOCAL_PASSWORD']
PORT = yaml_data['LOCAL_PORT']
DATABASE = yaml_data['LOCAL_DATABASE']

engine = create_engine(f"{DATABASE_TYPE}://{USER}:{PASSWORD}@{ENDPOINT}:{PORT}/{DATABASE}")
connection = engine.connect()
order_processed_df.to_sql("orders_table", engine, if_exists='replace', index=False)

## Task 8 -- Date Events Data

In [None]:
https://data-handling-public.s3.eu-west-1.amazonaws.com/date_details.json

In [None]:
s3 = boto3.client('s3')
s3.download_file('data-handling-public', 'date_details.json', '../date_details.json')

In [3]:
events_df = pd.read_json('../date_details.json')

In [4]:
#   -1) drop duplicates and store a copy of the original
events_df_processed = events_df.copy().drop_duplicates()
#   -2) remove all entries that are purely alphanumeric in nature
events_df_processed = events_df_processed[~events_df_processed["date_uuid"].apply(is_alphanumeric)]
#   -3) use info from year, month, day and timestamp to set a seperate datetime column
events_df_processed['datetime'] = pd.to_datetime(events_df_processed[['year', 'month', 'day', 'timestamp']].astype(str).agg(' '.join, axis=1), format='%Y %m %d %H:%M:%S')
#   -4) set timestamp, timeperiod and date_uuid as string
events_df_processed = events_df_processed.astype({"timestamp" : "string", "time_period" : "string", "date_uuid" : "string"})
#   -5) set month, year and day as int32
events_df_processed["month"] = pd.to_numeric(events_df_processed["month"], errors='coerce')
events_df_processed["year"] = pd.to_numeric(events_df_processed["year"], errors='coerce')
events_df_processed["day"] = pd.to_numeric(events_df_processed["day"], errors='coerce')

In [7]:
yaml_file_path = '../local_db_creds.yaml'

# Read the YAML file and store its contents in a Python data structure (dictionary)
with open(yaml_file_path, 'r') as file:
    yaml_data = yaml.safe_load(file)

# credentials
DATABASE_TYPE = 'postgresql+psycopg2'
ENDPOINT = yaml_data['LOCAL_HOST']
USER = yaml_data['LOCAL_USER']
PASSWORD = yaml_data['LOCAL_PASSWORD']
PORT = yaml_data['LOCAL_PORT']
DATABASE = yaml_data['LOCAL_DATABASE']

engine = create_engine(f"{DATABASE_TYPE}://{USER}:{PASSWORD}@{ENDPOINT}:{PORT}/{DATABASE}")
connection = engine.connect()
events_df_processed.to_sql("dim_date_times", engine, if_exists='replace', index=False)

# Finalizing Code

## Database Connector

In [19]:
"""
@reqs:
    [1] PyYAML : !pip install PyYAML
    [2] psycopg : !pip install psycopg2
"""
from sqlalchemy import create_engine, MetaData
import pandas as pd
import yaml

class DatabaseConnector:
    """
    @desc : use to connect with and upload data to the database.
    """
    def __init__(self):
        pass

    
    def read_db_creds(self):
        """
        @desc: reqs-> install PyYAML: pip install PyYAML
        """
        yaml_file_path = '../db_creds.yaml'

        # Read the YAML file and store its contents in a Python data structure (dictionary)
        with open(yaml_file_path, 'r') as file:
            return yaml.safe_load(file)
        

    def init_db_engine(self):
        yaml_data = self.read_db_creds()
        DATABASE_TYPE = 'postgresql'
        DBAPI = 'psycopg2'
        ENDPOINT = yaml_data['RDS_HOST']
        USER = yaml_data['RDS_USER']
        PASSWORD = yaml_data['RDS_PASSWORD']
        PORT = yaml_data['RDS_PORT']
        DATABASE = yaml_data['RDS_DATABASE']

        return create_engine(f"{DATABASE_TYPE}://{USER}:{PASSWORD}@{ENDPOINT}:{PORT}/{DATABASE}")
    
    def list_db_tables(self):
        # setup sql engine and connect
        sql_engine = self.init_db_engine()
        sql_engine.connect()

        # metadata, holds collection of table info, their data types, names 
        metadata = MetaData()
        metadata.reflect(sql_engine)
        table_names = metadata.tables.keys()

        # return the table names in a list format
        return list(table_names)
    

    def upload_to_db(self, in_df : pd.DataFrame, table_name : str):
        # Read the YAML file and store its contents in a Python data structure (dictionary)
        yaml_file_path = '../local_db_creds.yaml'
        with open(yaml_file_path, 'r') as file:
            yaml_data = yaml.safe_load(file)
        
        # setup credentials
        DATABASE_TYPE = 'postgresql+psycopg2'
        ENDPOINT = yaml_data['LOCAL_HOST']
        USER = yaml_data['LOCAL_USER']
        PASSWORD = yaml_data['LOCAL_PASSWORD']
        PORT = yaml_data['LOCAL_PORT']
        DATABASE = yaml_data['LOCAL_DATABASE']

        # connect and upload, index=False iff there exists a seperate column for index
        engine = create_engine(f"{DATABASE_TYPE}://{USER}:{PASSWORD}@{ENDPOINT}:{PORT}/{DATABASE}")
        connection = engine.connect()
        in_df.to_sql(table_name, engine, if_exists='replace', index=False)

## Data Extraction

In [30]:
from sqlalchemy import create_engine, MetaData, Table, select
import pandas as pd
import requests
import tabula
import boto3
import yaml

class DataExtractor(DatabaseConnector):
    """
    @desc : This class will work as a utility class, in it you will be creating methods that help extract data from different data sources.
    
    The methods contained will be fit to extract data from a particular data source, these sources will include CSV files, an API and an S3 bucket.
    """
    def read_rds_table(super, table_name='legacy_users'):
        """
        @desc:  extract the database table to a pandas DataFrame.
        @inputs: 
            [1] table_name : the table that needs to be inspected
        """
        sql_engine = super.init_db_engine()
        sql_connection = sql_engine.connect()
        metadata = MetaData().reflect(sql_engine)
        users_table = Table(table_name, metadata, autoload=True, autoload_with=sql_engine)
    
        return pd.DataFrame(sql_connection.execute(select(users_table)).fetchall())
    
    def retrieve_pdf_data(self, link2pdf : str):
        """
        @desc: given the link to the pdf document, this function will return the pd.Dataframe of that doc
        """
        return pd.concat(tabula.read_pdf(link2pdf, stream=True, pages='all'), ignore_index=True)
    
    def list_number_of_stores(self, header : dict, endpoint : str = "https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/number_stores"):
        """
        @desc: retrieves the store numbers data from the stores API, based on the input header and endpoint
        """
        response = requests.get(endpoint, headers=header)

        if response.status_code == 200:
            # Access the response data as JSON
            data = response.json()

            # Extract and print the name of the Pokémon
            num_stores = data['number_stores']
            print(f"Number of Stores: {num_stores}")
            return num_stores

        else:
            print(f"Request failed with status code: {response.status_code}")
            print(f"Response Text: {response.text}")


    def retrieve_stores_data(self, retrieve_store_endpoint : str = "https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/store_details/"):
        """
        @desc: given the retrieve a store endpoint, this function will return the data from all the stores as a pd.Dateframe
        """
        # the total stores information and a list to hold all of the responses
        store_number = 450
        store_detail = []
        
        # Read the YAML file and store its contents in a Python data structure (dictionary)
        yaml_file_path = '../api_key.yaml'
        with open(yaml_file_path, 'r') as file:
            yaml_data = yaml.safe_load(file)
        
        # setup the API key header
        headers = {
            "X-API-KEY": yaml_data["API_KEY"]
        }

        # loop through the store numbers and store the detail of the individual stores
        for i in range(store_number):
            url = retrieve_store_endpoint + str(i)
            response = requests.get(url, headers=headers)
            if response.status_code == 200:
                store_detail.append(response.json())

            else:
                print(f"Request failed with status code: {response.status_code}")
                print(f"Response Text: {response.text}")
        assert len(store_detail) == 450
        return pd.DataFrame(store_detail)
    

    def extract_from_s3(self):
        """
        @desc: retrives the products.csv table from the S3 bucket at s3://data-handling-public/products.csv
        """
        s3 = boto3.client('s3')
        s3.download_file('data-handling-public', 'products.csv', '../products_data.csv')
        return pd.read_csv('../products_data.csv') 
    

    def extract_eventstable_from_s3(self):
        """
        @desc: retrives the date_details.json table from the S3 bucket at https://data-handling-public.s3.eu-west-1.amazonaws.com/date_details.json
        """
        s3 = boto3.client('s3')
        s3.download_file('data-handling-public', 'date_details.json', '../date_details.json')
        events_df = pd.read_json('../date_details.json')

        #   -1) drop duplicates and store a copy of the original
        events_df_processed = events_df.copy().drop_duplicates()
        #   -2) remove all entries that are purely alphanumeric in nature
        events_df_processed = events_df_processed[~events_df_processed["date_uuid"].apply(is_alphanumeric)]
        #   -3) use info from year, month, day and timestamp to set a seperate datetime column
        events_df_processed['datetime'] = pd.to_datetime(events_df_processed[['year', 'month', 'day', 'timestamp']].astype(str).agg(' '.join, axis=1), format='%Y %m %d %H:%M:%S')
        #   -4) set timestamp, timeperiod and date_uuid as string
        events_df_processed = events_df_processed.astype({"timestamp" : "string", "time_period" : "string", "date_uuid" : "string"})
        #   -5) set month, year and day as int64
        events_df_processed["month"] = pd.to_numeric(events_df_processed["month"], errors='coerce')
        events_df_processed["year"] = pd.to_numeric(events_df_processed["year"], errors='coerce')
        events_df_processed["day"] = pd.to_numeric(events_df_processed["day"], errors='coerce')

        return events_df_processed

## Data Cleaner

In [12]:
import pandas as pd
import re
from dateutil.parser import parse
class DataCleaning:
    """
    @desc : methods to clean data from each of the data sources
    """
    
    # ================================================================== #
    #                   UTILITY FUNCTIONS
    # ================================================================== #
    @staticmethod
    def convert_weights(weight):
        """
        @desc: check if the input matches the numeric and alphabet matching expression
        """
        match = re.match(r"([\d.]+)([a-zA-Z]+)", weight)
        # if there is match then set the first output as value (numeric) and second as unit (g, kg, ml or l)
        if match:
            value, unit = match.groups()
            # conver the numeric value to floating pt
            value = float(value)
            # check for the cases of 'g', 'ml' and 'l'
            if unit == 'g':
                value /= 1000
                unit = 'kg'
            elif unit == 'ml':
                value /= 1000
                unit = 'kg'
            elif unit == 'l':
                unit = 'kg'
            elif unit == 'oz':
                value *= 0.0283495
                unit = 'kg'
            # force the output to be 3 d.p
            return f'{value:.3f}{unit}'
        else:
            return weight
    
    @staticmethod
    def mullexp_to_netresult(in_exp):
        if 'x' in in_exp:
            match = re.match(r'(\d+)\s*x\s*(\d+)([a-zA-Z]+)', in_exp)
            if match:
                multiplier = int(match.group(1))
                value = int(match.group(2))
                unit = match.group(3)
                # Perform the multiplication
                result = multiplier * value
                # Append the result with the unit
                return str(result) + unit
        else:
            return in_exp
        

    @staticmethod
    def is_alpha(in_str):
        """
        @desc: function to check if the column has ONLY alphabets entries
        """
        return any(c.isalpha() for c in in_str)
    
    @staticmethod
    def is_alphanumeric(in_str):
        """
        @desc: function to check if the column has alphanumeric entries
        """
        return bool(re.fullmatch(r'^[a-zA-Z0-9_]*$', in_str))

    @staticmethod
    def has_yyyy_mm_dd_format(in_str):
        """
        @desc: function to decide if the a column of a data has date format yyyy-mm-dd
        """
        return bool(re.fullmatch(r'\d{4}-\d{2}-\d{2}', in_str))

    @staticmethod
    def convert_date_to_yyyy_mm_dd(in_column : pd.core.series.Series):
        """
        @desc: function to set the date column with date format yyyy-mm-dd
        """
        in_column = in_column.apply(parse)
        in_column = pd.to_datetime(in_column, infer_datetime_format=True, errors='coerce')
        
        return in_column

    # ================================================================== #
    #                   PRIMARY FUNCTIONS
    # ================================================================== #
    def clean_orders_data(self, orders_df : pd.DataFrame):
        """
        @desc: pre-process the orders table
        """
        order_processed_df = orders_df.copy().drop_duplicates()
        order_processed_df = order_processed_df.drop(columns={'first_name', 'last_name', '1'})
        
        return order_processed_df

    def clean_user_data(self, users_df : pd.DataFrame, table_name : str = 'legacy_users'):
        """
        @desc: pre-process the legacy users table
        """
        if users_df.isnull().sum().sum() and users_df.isna().sum().sum():
            raise f"The database : {table_name}, has total {users_df.isnull().sum().sum()} NULL values and {users_df.isna().sum().sum()} NaN values"
        else:
            print(f"[usrmsg] No NULLs or NaNs found in {table_name}")
        users_df_processed = users_df.copy()
        users_df_processed = users_df[~users_df.apply(lambda row: row.astype(str).str.contains('NULL').any(), axis=1)]
        # check for data types 
        #   -1) always begin with dropping duplicates and storing as a seperate file
        users_df_processed = users_df_processed.drop_duplicates()
        #   -2) set all columns except index to be of string format
        str_convert_dict = {col: 'string' for col in users_df_processed.columns if col not in ['index']}
        users_df_processed = users_df_processed.astype(str_convert_dict)
        #   -3) remove all entries that are pure alphanumeric
        users_df_processed = users_df_processed[~users_df_processed['email_address'].apply(is_alphanumeric)]
        #   -4) DoB and join_date should be datetime format and of type yyyy-mm-dd
        users_df_processed['date_of_birth'] = convert_date_to_yyyy_mm_dd(users_df_processed['date_of_birth'])
        users_df_processed['join_date'] = convert_date_to_yyyy_mm_dd(users_df_processed['join_date'])
        #   -5) convert all 'GGB' country code to 'GB'
        users_df_processed['country_code'] = users_df_processed['country_code'].str.replace('GGB', 'GB', regex=False)

        return users_df_processed
    
    def clean_card_data(self, card_df : pd.DataFrame):
        """
        @desc: pre-process the card table
        """
        #   -1) drop columns that are filled with missing and/or incorrect information
        card_processed_df = card_df.copy().drop(columns=['Unnamed: 0'])
        #  -2) always begin with dropping duplicates 
        card_processed_df = card_processed_df.drop_duplicates()
        #   -3) remove columns with "NULL"
        card_processed_df = card_processed_df[~card_processed_df.apply(lambda row: row.astype(str).str.contains('NULL').any(), axis=1)]
        #   -4) remove all entries that are pure alphanumeric
        card_processed_df = card_processed_df[~card_processed_df['card_provider'].apply(is_alphanumeric)]
        #   -5) those rows that have NaN in card_number expiry_date, fill those appropriately
        nan_card_num_expiry_date_df = card_processed_df[card_processed_df['card_number expiry_date'].isna()]
        nan_card_num_expiry_date_df['card_number expiry_date'] = nan_card_num_expiry_date_df['card_number'].astype(str) + ' ' + nan_card_num_expiry_date_df['expiry_date'].astype(str)
        #   -6) those rows that DONT have NaN in card_number expiry_date column, strip those isolated and replace their equivalent NaN values in the card_number and the expiry_date columns appropriately
        not_nan_card_num_expiry_date_df = card_processed_df[~card_processed_df['card_number expiry_date'].isna()]
        splitted_cardnumexpdate_df = not_nan_card_num_expiry_date_df['card_number expiry_date'].str.split(n=1, expand=True)
        not_nan_card_num_expiry_date_df['card_number'], not_nan_card_num_expiry_date_df['expiry_date'] = splitted_cardnumexpdate_df[0], splitted_cardnumexpdate_df[1]
        #   -7) combine the two to store the seperate data with no NaNs . . . (hopefully :P)
        card_processed_df = pd.concat([nan_card_num_expiry_date_df, not_nan_card_num_expiry_date_df], ignore_index=True)
        del nan_card_num_expiry_date_df, not_nan_card_num_expiry_date_df
        #   -8) change all objects to string
        card_processed_df = card_processed_df.astype('string')
        #   -9) finally, change all date columns to datetimeformat
        card_processed_df['date_payment_confirmed'] = convert_date_to_yyyy_mm_dd(card_processed_df['date_payment_confirmed'])
        card_processed_df['expiry_date'] = pd.to_datetime(card_processed_df['expiry_date'], format='%m/%y') + pd.offsets.MonthEnd(0)

        return card_processed_df
    
    def called_clean_store_data(self, store_detail_df : pd.DataFrame):
        """
        @desc: pre-process the store table
        """
        store_detail_processed_df = store_detail_df.copy()
        #   -1) always begin with removing duplicates
        store_detail_processed_df = store_detail_processed_df.drop_duplicates()
        #   -2) remove purely nan or none columns (e.g. lat)
        store_detail_processed_df = store_detail_processed_df.drop(columns="lat")
        store_detail_processed_df = store_detail_processed_df.drop(columns="address")
        #   -3) remove all pure alphanmueric rows
        store_detail_processed_df = store_detail_processed_df[~store_detail_processed_df['opening_date'].apply(is_alphanumeric)]
        #   -4) account for missing addresses, longitude and latitude values
        # --> ANS) No need to change, it is a portal type store, and only one and unique in the table
        #   -5) remove all alphabets in staff_numbers column
        store_detail_processed_df["staff_numbers"] = store_detail_processed_df["staff_numbers"].str.replace(r'[a-zA-Z]', '', regex=True)
        #   -6) fix format of opening_date
        store_detail_processed_df["opening_date"] = convert_date_to_yyyy_mm_dd(store_detail_processed_df["opening_date"])
        #   -7) set eeEurope and eeAmerica to Europe and America in the continent column
        store_detail_processed_df["continent"] = store_detail_processed_df["continent"].str.replace('eeEurope', 'Europe')
        store_detail_processed_df["continent"] = store_detail_processed_df["continent"].str.replace('eeAmerica', 'America')
        #   -8) convert all object to string appropriately and all numbers to int and float appropriately
        store_detail_processed_df = store_detail_processed_df.astype({col: 'string' for col in store_detail_processed_df.columns if col not in ["index", "opening_date", "longitude", "staff_numbers", "latitude"]})
        # store_detail_processed_df = store_detail_processed_df.astype({col: 'float64' for col in store_detail_processed_df.columns if col in ["longitude", "latitude"]})
        store_detail_processed_df["longitude"] = pd.to_numeric(store_detail_processed_df["longitude"], errors='coerce')
        store_detail_processed_df["latitude"] = pd.to_numeric(store_detail_processed_df["latitude"], errors='coerce')
        store_detail_processed_df["staff_numbers"] = pd.to_numeric(store_detail_processed_df["staff_numbers"], errors='coerce')

        return store_detail_processed_df


    def clean_products_data(self, products_df : pd.DataFrame):
        """
        @desc: pre-process the store table
        """
        products_processed_df = products_df.copy()
        #   -1) always begin by dropping duplicates
        products_processed_df = products_processed_df.drop_duplicates()
        #   -2) rename the Unamed column to index
        products_processed_df.rename(columns={'Unnamed: 0': 'index'}, inplace=True)
        #   -3) fill nans with empty strings to allow easier processing for the rest of the cleaning
        products_processed_df = products_processed_df.fillna('')
        #   -4) remove all the pure alphanumeric entries
        products_processed_df = products_processed_df[~products_processed_df['date_added'].apply(is_alphanumeric)]
        #   -5) for weights : a) compute all multiplication expressions and replace with resultant value
        products_processed_df["weight"] = products_processed_df["weight"].apply(mullexp_to_netresult)
        #   -6) for weights : b) standardise them to 'kg'
        products_processed_df["weight"] = products_processed_df["weight"].apply(convert_weights)
        #   -7) drop £ and kg to enable weight and product price columns to be numeric
        products_processed_df["product_price"] = products_processed_df["product_price"].str.replace('£', '')
        products_processed_df.rename(columns={'product_price': 'product_price (£)'}, inplace=True)
        products_processed_df["weight"] = products_processed_df["weight"].str.replace('kg', '')
        products_processed_df.rename(columns={'weight': 'weight (kg)'}, inplace=True)
        #   -8) set product_price, weight, EAN to be numeric
        products_processed_df["product_price (£)"] = pd.to_numeric(products_processed_df["product_price (£)"], errors='coerce')
        products_processed_df["weight (kg)"] = pd.to_numeric(products_processed_df["weight (kg)"], errors='coerce')
        products_processed_df["EAN"] = pd.to_numeric(products_processed_df["EAN"], errors='coerce')
        #   -9) set product_name, cateogry, uuid, removed and product_code to string  AND  date_added to datetime
        products_processed_df = products_processed_df.astype({"product_name" : "string", "category" : "string", "uuid" : "string", "removed" : "string", "product_code" : "string"})
        products_processed_df['date_added'] = convert_date_to_yyyy_mm_dd(products_processed_df['date_added'])

        return products_processed_df
    

    def clean_event_date_data(self, events_df : pd.DataFrame):
        """
        @desc: pre-process the date details table
        """
        #   -1) drop duplicates and store a copy of the original
        events_df_processed = events_df.copy().drop_duplicates()
        #   -2) remove all entries that are purely alphanumeric in nature
        events_df_processed = events_df_processed[~events_df_processed["date_uuid"].apply(is_alphanumeric)]
        #   -3) use info from year, month, day and timestamp to set a seperate datetime column
        events_df_processed['datetime'] = pd.to_datetime(events_df_processed[['year', 'month', 'day', 'timestamp']].astype(str).agg(' '.join, axis=1), format='%Y %m %d %H:%M:%S')
        #   -4) set timestamp, timeperiod and date_uuid as string
        events_df_processed = events_df_processed.astype({"timestamp" : "string", "time_period" : "string", "date_uuid" : "string"})
        #   -5) set month, year and day as int32
        events_df_processed["month"] = pd.to_numeric(events_df_processed["month"], errors='coerce')
        events_df_processed["year"] = pd.to_numeric(events_df_processed["year"], errors='coerce')
        events_df_processed["day"] = pd.to_numeric(events_df_processed["day"], errors='coerce')

        return events_df_processed

## Main

In [31]:
# ingest the datasets
connector = DatabaseConnector()

In [32]:
extractor = DataExtractor(connector)


TypeError: __init__() takes 1 positional argument but 2 were given

In [None]:
def is_alpha(in_str):
    """
    @desc: function to check if the column has alphanumeric entries
    """
    return any(c.isalpha() for c in in_str)

def is_alphanumeric(in_str):
    """
    @desc: function to check if the column has alphanumeric entries
    """
    return bool(re.fullmatch(r'^[a-zA-Z0-9_]*$', in_str))

def has_yyyy_mm_dd_format(in_str):
    """
    @desc: function to decide if the a column of a data has date format yyyy-mm-dd
    """
    return bool(re.fullmatch(r'\d{4}-\d{2}-\d{2}', in_str))

def convert_date_to_yyyy_mm_dd(in_column : pd.core.series.Series):
    """
    @desc: function to set the date column with date format yyyy-mm-dd
    """
    in_column = in_column.apply(parse)
    in_column = pd.to_datetime(in_column, infer_datetime_format=True, errors='coerce')
    
    return in_column

In [None]:
# check for nulls or NaNs, alternative is np.unique(users_df.isnull()), or just use df.info()
if users_df.isnull().sum().sum() and users_df.isna().sum().sum():
    raise f"The database : {table_name}, has total {users_df.isnull().sum().sum()} NULL values and {users_df.isna().sum().sum()} NaN values"
else:
    print(f"[usrmsg] No NULLs or NaNs found in {table_name}")

users_df_processed = users_df[~users_df.apply(lambda row: row.astype(str).str.contains('NULL').any(), axis=1)]

# check for data types 
#   -1) always begin with dropping duplicates and storing as a seperate file
users_df_processed = users_df_processed.drop_duplicates()
#   -2) set all columns except index to be of string format
str_convert_dict = {col: 'string' for col in users_df_processed.columns if col not in ['index']}
users_df_processed = users_df_processed.astype(str_convert_dict)
#   -3) remove all entries that are pure alphanumeric
users_df_processed = users_df_processed[~users_df_processed['email_address'].apply(is_alphanumeric)]
#   -4) DoB and join_date should be datetime format and of type yyyy-mm-dd
users_df_processed['date_of_birth'] = convert_date_to_yyyy_mm_dd(users_df_processed['date_of_birth'])
users_df_processed['join_date'] = convert_date_to_yyyy_mm_dd(users_df_processed['join_date'])
#   -5) convert all 'GGB' country code to 'GB'
users_df_processed['country_code'] = users_df_processed['country_code'].str.replace('GGB', 'GB', regex=False)


Update your GitHub repository with the latest code changes from your local project. Start by staging your modifications and creating a commit. Then, push the changes to your GitHub repository.

Additionally, document your progress by adding to your GitHub README file. You can refer to the relevant lesson in the prerequisites for this task for more information.

At minimum, your README file should contain the following information:

Project Title

> Table of Contents, if the README file is long

> A description of the project: what it does, the aim of the project, and what you learned

> Installation instructions

> Usage instructions

> File structure of the project

License information
* You don't have to write all of this at once, but make sure to update your README file as you go along, so that you don't forget to add anything.

* Refactoring will be a continuous and constant process, but this is the time to really scrutinise your code.

You can use the following list to make improvements:

* Meaningful Naming: Use descriptive names for methods and variables to enhance code readability. For example, create_list_of_website_links() over links() and use for element in web_element_list instead of for i in list.
* Eliminate Code Duplication: Identify repeated code blocks and refactor them into separate methods or functions. This promotes code reusability and reduces the likelihood of bugs.
* Single Responsibility Principle (SRP): Ensure that each method has a single responsibility, focusing on a specific task. If a method handles multiple concerns, split it into smaller, focused methods.
* Access Modifiers: Make methods private or protected if they are intended for internal use within the class and not externally accessible
* Main Script Execution: Use the if __name__ == "__main__": statement to include code blocks that should only run when the script is executed directly, not when imported as a module
* Consistent Import Order: Organize import statements in a consistent manner, such as alphabetically, and place from statements before import statements to maintain readability
* Avoid Nested Loops: Minimize nested loops whenever possible to improve code efficiency and reduce complexity
* Minimal Use of self: When writing methods in a class, only use self for variables that store information unique to each object created from the class. This helps keep the code organized and ensures that each object keeps its own special data separate from others.
* Avoid import *: Import only the specific methods or classes needed from a module to enhance code clarity and prevent naming conflicts
* Consistent Docstrings: Provide clear and consistent docstrings for all methods, explaining their purpose, parameters, and return values. This aids code understanding for other developers.
* Type Annotations: Consider adding type annotations to method signatures, variables, and return values to improve code maintainability and catch type-related errors during development