In [None]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
import psycopg2
from datetime import datetime, timedelta
import pytz
import re
import os
import logging

In [None]:
#Setting up logs
logging.basicConfig(
    filename='etl_data_warehouse_logs.txt', 
    level=logging.INFO,      
    format='%(asctime)s - %(message)s'
)

In [None]:
# Define variables for connection parameters for staging area
username = 'postgres'
password = 'makhubela'
host = 'localhost'
port = '5432'
database = 'DW_staging_area'

# Construct the connection string using variables
connection_string_staging = f'postgresql://{username}:{password}@{host}:{port}/{database}'

# Create the SQLAlchemy engine
engine_staging = create_engine(connection_string_staging)

In [None]:
# Define variables for connection parameters for data warehouse
username = 'postgres'
password = 'makhubela'
host = 'localhost'
port = '5432'
database = 'data_warehouse'

# Construct the connection string using variables
connection_string_DW = f'postgresql://{username}:{password}@{host}:{port}/{database}'

# Create the SQLAlchemy engine
engine_DW = create_engine(connection_string_DW)

In [None]:
#create empty dictonery for the DataFrames
total_num_row = 0

table_data = {}

table_names = ['city',
               'country',
               'province',
               'day_part',
               'day_date',
               'franchisee',
               'product_category',
               'stores',
               'product_group_level1',
               'product_group_level2',
               'product_group_level3',
               'product_group_level4',
               'sales_type',
               'trading_hours',
               'volumn_band',
               'owner_operator',
               'order_type',
               'daily_sales_products',
               'dw_batch']
try:
    for table in table_names:
        query = f"SELECT * FROM {table}"
        table_data[table] = pd.read_sql(query,con=engine_staging)
        logging.info(f"Number of rows in {table} : {len(table_data[table])}")
        print(f"Number of rows in {table} : {len(table_data[table])}")
        number_row = len(table_data[table])
        total_num_row = number_row + total_num_row
except Exception as e:
    logging.warning("error occured: error loading tables as dataframes", exc_info=True) 
print("the total number of all rows = ",total_num_row)
logging.info(f"the total number of all rows = ,{total_num_row}")


In [None]:
#loading data into dataframes 
try:
    df_city = table_data['city']
    df_country = table_data['country']
    df_province = table_data['province']
    df_day_part = table_data['day_part']
    df_day_date = table_data['day_date']
    df_franchisee = table_data['franchisee']
    df_product_category = table_data['product_category']
    df_stores = table_data['stores']
    df_product_group_level1 = table_data['product_group_level1']
    df_product_group_level2 = table_data['product_group_level2']
    df_product_group_level3 = table_data['product_group_level3']
    df_product_group_level4 = table_data['product_group_level4']
    df_sales_type = table_data['sales_type']
    df_trading_hours = table_data['trading_hours']
    df_volumn_band = table_data['volumn_band']
    df_owner_operator = table_data['owner_operator']
    df_order_type = table_data['order_type']
    df_daily_sales_products = table_data['daily_sales_products']
    df_dw_batch = table_data['dw_batch']
    logging.info("loading tables as dataframes successul!")
except Exception as e:
    logging.warning("error occured: error loading tables as dataframes", exc_info=True)


In [None]:
#creating a list of dataframes 
total_num_row = 0
data_frames = [df_city,
               df_country,
               df_province,
               df_day_date,
               df_day_part,
               df_franchisee,
               df_product_category,
               df_stores,
               df_product_group_level1,
               df_product_group_level2,
               df_product_group_level3,
               df_product_group_level4,
               df_sales_type,
               df_trading_hours,
               df_volumn_band,
               df_owner_operator,
               df_order_type,
               df_daily_sales_products]

#deleting unwanted columns in all tables namely [sbID and dwbID]
try:
    for table in data_frames:
        table.drop(['sbID','dwbID'],axis=1,inplace=True)
    logging.info("dropping columns: sbID and/or dwbID successful!")    
except Exception as e:
    logging.warning("error occured: error dropping columns: sbID and/or dwbID ", exc_info=True)

In [None]:
# Tranformations
#joining 2 dataframes in to 1
try:
    df_merge = pd.merge(df_city,df_province, on ='ProvinceID', how='inner')
    df_concat = pd.concat([df_merge,df_country],axis=1)
    logging.info('merging city,province and country tables successful!')
except Exception as e:
    logging.warning("error occured: error merging city,province and country tables", exc_info=True) 
#creating location table
try:
    df_concat['CountryName'] = df_concat['CountryName'].fillna(df_country['CountryName'].iloc[0] )
    df_location = df_concat.loc[:,['CityID','CityName','CountryName','ProvinceName']]
except Exception as e:
    logging.warning("error occured: error creating location table", exc_info=True)     
#renaming the column
df_location = df_location.rename(columns = {'CityID':'LocationID'})

#filling Location ID on fact table
try:
    df_daily_sales_products['LocationID'] = df_daily_sales_products['CityID']
    logging.info("LocationID column successfully create")
except Exception as e:
    logging.warning("error occured: error creating LocationID in the daily_sales_products fact table", exc_info=True) 

In [None]:
#Renaming columns 
try:
    df_stores = df_stores.rename(columns = {'McDelivery': 'Deliveries','McCafe' : 'Cafe'})
    df_daily_sales_products = df_daily_sales_products.rename(columns = {'McDelivery': 'Deliveries','McCafe' : 'Cafe'})
    logging.info("Renamed columns McDelivery and McCafe for the stores table,and daily_sales_products ")
except Exception as e:
    logging.warning("error occured: error Renaming columns McDelivery and McCafe for the stores table,and daily_sales_products", exc_info=True)
    
#replacing mc,mcD and Mcdonald's from emails
dict_owner_emails = { 'macdonalds':'restuarant',
                     'mcd':'rs',
                     'mc':'rs'
                    }

original_email = df_owner_operator['EmailAddress']

def replace_trademarks_1(value):
    if value is not None:  # Check if value is not None
        for original, replacement in dict_owner_emails.items():
            value = value.replace(original, replacement)
    return value
try:
# Apply the custom function to the 'EmailAddress' column
    df_owner_operator['EmailAddress'] = df_owner_operator['EmailAddress'].apply(replace_trademarks_1)
    logging.info("changing emails to not reference McDonald's successful")
except Exception as e:
    logging.warning("error occured: error changing emails to not reference McDonald's ", exc_info=True)
new_email = df_owner_operator['EmailAddress']
# Count the number of rows that changed
try:
    changed_rows_emails = (original_email != new_email).sum()
    print(f" number of changed rows {changed_rows_emails}")
    logging.info(f" number of changed rows {changed_rows_emails}")
except Exception as e:
    logging.warning("error occured: error in number of changed rows for owner operator email column", exc_info=True)
try:
    original_franchisee = df_franchisee['FranchiseeName']
    # Replace 'FranchiseeName' column
    df_franchisee['FranchiseeName'] = df_franchisee['FranchiseeName'].str.replace('Mc','')
    #new franchisee variable after changes 
    new_franchisee_name = df_franchisee['FranchiseeName']
    # Count the number of rows that changed
    changed_rows_franchisee_names = (original_franchisee != new_franchisee_name).sum()
    print(f" number of changed rows {changed_rows_franchisee_names}")
    logging.info(f" number of changed rows {changed_rows_franchisee_names} in franchisee table")
except Exception as e:
    logging.warning("error occured: error in changing franchisee table, FranchiseeName column", exc_info=True)

In [None]:
orginal_product_desc = df_product_group_level4['ProductLevel4Desc']

dict_trademarks = { 'McFlurry' : 'Ice Cream' ,
                   'Mcflurry' : 'Ice Cream',
                   'Delux' : 'Burger',
                   'Deluxe' : 'Burger',
                   'BigMacDev' : 'Tower Burger',
                   'Mac' : 'Specialty Burger',
                   'Boerie' : 'Boerivoers',
                   'Foldover' :'Sandwich',
                   'Big Mac' :'Big Burger',
                    'Grand' : 'Gigalicous',
                   'Junior' : 'Kiddie',
                   'Quarter' :'Single Patty',
                   'Qtr' : 'Single Patty',
                    'McMuffin' : 'English Muffin',
                   'Fillet-O-Fish': 'Fish Block',
                   'bigtasty' : 'Sizeable Delight',
                    'Big Tasty' :' Sizeable Delight',
                   'Mcfeast' : 'Banquet',
                   'McFeast' : 'Banquet',
                   'Mcchicken' : 'Chicken Burger',
                    'McChicken' : 'Chicken Burger',
                   'Upsize' : 'Enlarge',
                   'QP' : 'Single Patty',
                    'Grand Big MAC' : 'Gigalicous Big Burger',
                   'BOGOF' : 'Drama Burger',
                   'Sharebag' : 'Share Meal',
                    'sharebag' : 'Share Meal',
                   'Share Box' : 'Family Pack',
                   'ShareBox' : 'Family Pack',
                    'Jr. MAC' : 'Regular Burger',
                   'Happy' : 'Joyful',
                   'McFizz' : 'Soda Float',
                    'QPC' : 'Single Patty Cheese',
                   'Grand Chicken' : 'Gigalicous Chicken',
                    'Green Apple Fizz' : 'Green Apple Soda',
                   'Koko Donut' : 'Chocolate Donut',
                    'Bigtasty' : 'Mega Burger',
                   'McBites' : 'Fried Chicken Snacks',
                   'Mac Jr' :'Regular Burger',  
                    'Mc Wings' : 'Chicken Wings',
                   'McBraai' : 'Braai', 
                   'McDouble' : 'Double Patty Burger',  
                    'McMixa' : 'Decadent Ice Cream', 
                   'Mcdipper' : 'Cheese Sticks', 
                   'McRoyale' : 'Cheesy Burger' 
                    }
#replacing function for Mcdonald's trademarks
def replace_trademarks_2(value):
    for original, replacement in dict_trademarks.items():
        value = value.replace(original, replacement)
    return value
try:
    # Apply the custom function to the 'ProductLevel4Desc' column
    df_product_group_level4['ProductLevel4Desc'] = df_product_group_level4['ProductLevel4Desc'].apply(replace_trademarks_2)
    new_product_desc = df_product_group_level4['ProductLevel4Desc']
    # Count the number of rows that changed
    changed_rows_desc = (orginal_product_desc != new_product_desc).sum()
    print(f" number of changed rows {changed_rows_desc}")
    logging.info(f" number of changed rows {changed_rows_desc} in ProductGroupLevel4 table,ProductLevel4Desc column. removed macdonald's references")
except Exception as e:
    logging.warning("error occured: error in changing ProductGroupLevel4 table, ProductLevel4Desc column. could not remove macdonald's references", exc_info=True)

In [None]:
#
def is_valid_email(email):
    pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
    return re.match(pattern, email) is not None

# Function to clean the emails
def clean_email(email):
    if is_valid_email(email):
        return email
    else:
        return None
try:    
    original_emails = df_stores['EmailAddress']
    cleaned_emails = original_emails.apply(clean_email)
    # Count the number of rows that changed
    changed_rows_email = (original_emails != cleaned_emails).sum()
    logging.info(f"Successfully removed non-valid emails. Number of rows changed {changed_rows_email},in stores table,column EmailAddress.")
except Exception as e:
    logging.warning("error occured: error removing non-valid emails", exc_info=True)
print("Number of rows changed:", changed_rows_email)

df_stores['EmailAddress'] = df_stores['EmailAddress'].apply(clean_email)

In [None]:
# Function to check if a phone number is in the correct format
def is_valid_phone(phone):
    pattern = r'^0\d{9}$'  # Phone number should start with 0 and have exactly 10 digits
    return re.match(pattern, phone) is not None

# Function to clean the phone numbers
def clean_phone(phone):
    if is_valid_phone(phone):
        return phone
    else:
        return None
try:    
    original_TelephoneNumber = df_stores['TelephoneNumber']
    cleaned_TelephoneNumber = original_TelephoneNumber.apply(clean_phone)
    # Count the number of rows that changed
    changed_rows_TelephoneNumber = (original_TelephoneNumber != cleaned_TelephoneNumber).sum()
    logging.info(f"Successfully removed non-valid contact numbers. Number of rows changed {changed_rows_TelephoneNumber},in stores table,column ContactNumber.")
except Exception as e:
    logging.warning("error occured: error removing non-valid contact numbers", exc_info=True)
print("Number of rows changed:", changed_rows_TelephoneNumber)
    
df_stores['TelephoneNumber'] = df_stores['TelephoneNumber'].apply(clean_phone)

In [None]:
#formating startime and endtime to time data type
try:
    day_part_time = ('StartTime','EndTime')

    for start_end_time in day_part_time:
        df_day_part[start_end_time] = df_day_part[start_end_time].astype(str)
        # used a Zfill function to fill the column with zeros to be 6 characters long
        df_day_part[start_end_time] = df_day_part[start_end_time].apply(lambda x: x.zfill(6))
        # Split the string into hours, minutes, and seconds
        df_day_part[start_end_time] = df_day_part[start_end_time].apply(lambda x: ':'.join([x[:2], x[2:4], x[4:]]))
        # removes dates for the datetime format  
        df_day_part[start_end_time] = pd.to_datetime(df_day_part[start_end_time], format='%H:%M:%S').dt.time
    logging.info("Successfully formated StartTime and EndTime to time data type in DayPart table")
except Exception as e:
    logging.warning("error occured: error formating StartTime and EndTime to time data type in DayPart table ", exc_info=True)

In [None]:
try:    
    # Convert 'ContactNumber' column to string
    # Replace 'None' with NaN (missing values)
    df_owner_operator['ContactNumber'] = df_owner_operator['ContactNumber'].replace('None', np.nan)
    # Convert 'ContactNumber' column to string
    df_owner_operator['ContactNumber'] = df_owner_operator['ContactNumber'].astype(str)
    # Remove any decimal points from floating-point values and pad with leading zeros
    df_owner_operator['ContactNumber'] = df_owner_operator['ContactNumber'].str.split('.').str[0].str.zfill(10)

    df_owner_operator['ContactNumber'] = df_owner_operator['ContactNumber'].replace('None', np.nan)
    # Convert 'ContactNumber' column to string
    # Remove leading '0' characters before 'None'
    df_owner_operator['ContactNumber'] = df_owner_operator['ContactNumber'].str.replace(r'^0+(?=None)', '', regex=True)
    df_owner_operator['ContactNumber'] = df_owner_operator['ContactNumber'].astype(str)
    logging.info(" Successfully converted ContactNumber column to a string in OwnerOperator table ")
except Exception as e:
    logging.warning("error occured: error converting ContactNumber column to a string in OwnerOperator table ", exc_info=True)

In [None]:
# change ints to boolean in owner_operator and franchisee tables 
try:
    df_tbl_names = (df_owner_operator,df_franchisee)

    for tbl in df_tbl_names:
        tbl['Active'] = tbl['Active'].astype(str)
        tbl['Active'] = tbl['Active'] == '1'
    logging.info("Successfully converted integers into boolean type in Active column in OwnerOperator and franchisee tables.")
except Exception as e:
    logging.warning("error occured: error converting integers into boolean type in Active column in OwnerOperator and franchisee tables. ", exc_info=True)
# change ints to boolean in stores and daily_sales_products table
try:
    colms_names = ('Cafe','GeneratorID','DriveThru','Deliveries','OpenStatusID','PlayPlace','Wifi','DessertKiosk')

    for col in colms_names:
        df_stores[col] = df_stores[col].astype(str)
        df_stores[col] = df_stores[col] == '1'
        df_daily_sales_products[col] = df_daily_sales_products[col].astype(str)
        df_daily_sales_products[col] = df_daily_sales_products[col] == '1'
        logging.info("Successfully converted integers into boolean type in stores and DailySalesProducts")
except Exception as e:
    logging.warning("error occured: error converting integers into boolean type in stores and DailySalesProducts. ", exc_info=True)

In [None]:
try:  
    # setting the time zone to South African time 
    sa_tz = pytz.timezone('Africa/Johannesburg')
    start_time = datetime.now(sa_tz)
    print(f'start time :{start_time}')
    new_date = [start_time]  # New data to be added
    start_date = {'start_load_date':new_date}  # Name of the existing column
    df_start_date = pd.DataFrame(start_date)
    df_start_date.to_sql('dw_batch', engine_staging, if_exists='append', index=False)
    logging.info("start time successfully loaded into dw_batch table")
    print("start time successfully loaded into dw_batch table")
except Exception as e:
    logging.warning("error occured: start time not loaded in the dw_batch table", exc_info=True)
   

In [None]:
# creating a dictionary with table names and dataframes
dict_table_names = {'dim_country':df_country,
                    'dim_province':df_province,
                    'dim_city':df_city,
                    'dim_order_type':df_order_type,
                    'dim_sales_type':df_sales_type,
                    'dim_trading_hours':df_trading_hours,
                    'dim_volumn_band':df_volumn_band,
                    'dim_day_date':df_day_date,
                    'dim_day_part':df_day_part,
                    'dim_location':df_location,
                    'dim_owner_operator':df_owner_operator,
                    'dim_franchisee':df_franchisee,
                    'dim_product_category':df_product_category,
                    'dim_product_group_level1':df_product_group_level1,
                    'dim_product_group_level2':df_product_group_level2,
                    'dim_product_group_level3':df_product_group_level3,
                    'dim_product_group_level4':df_product_group_level4,
                    'dim_stores':df_stores,
                    'fact_daily_sales':df_daily_sales_products
                    }

In [None]:
# Check if the file storing successfully loaded tables exists
if os.path.exists("successfully_loaded_tables.txt"):
    with open("successfully_loaded_tables.txt", "r") as file:
        successfully_loaded_tables = file.read().splitlines()
else:
    successfully_loaded_tables = []

transaction = engine_DW.begin()

for table_name, df in dict_table_names.items():
    if table_name in successfully_loaded_tables:
        print(f"Table '{table_name}' has already been successfully loaded. Skipping...")
        continue
    
    try:
        df.to_sql(table_name, engine_DW, if_exists='append', index=False)
        print(f"Table '{table_name}' successfully loaded.")
        successfully_loaded_tables.append(table_name)
        logging.info(f"Table '{table_name}' successfully loaded.")
    except Exception as e:
        print(f"Error loading table '{table_name}': {e}")
        logging.warning(f"error occured: Error loading table '{table_name}': {e}")
# Write the list of successfully loaded tables to the file
with open("successfully_loaded_tables.txt", "w") as file:
    file.write("\n".join(successfully_loaded_tables))

try:
    if len(successfully_loaded_tables) == len(dict_table_names):
        os.remove("successfully_loaded_tables.txt")
        print("All tables have been successfully loaded. The file has been deleted.")
        logging.info("All tables have been successfully loaded. The file has been deleted")
except Exception as e:
    logging.warning("error occured: Some tables have not been loaded. The file has not been deleted", exc_info=True)

In [None]:
sa_tz = pytz.timezone('Africa/Johannesburg')
end_time = datetime.now(sa_tz)
logging.info(f'Successfully finished loading at end_time:{end_time}')
print(f'end_time:{end_time}')

In [None]:
# taking the max dwbID from the staging area
sql_line = 'SELECT "dwbID" FROM dw_batch order by "dwbID" desc limit 1'
df_dw_batch = pd.read_sql_query(sql_line, engine_staging)
dw_batch_id =df_dw_batch['dwbID'].squeeze()
dw_batch_id_int = int(dw_batch_id)
print(dw_batch_id)


In [None]:
# Define the SQL query to select the latest record from staging_batch
sql_line = 'SELECT * FROM dw_batch ORDER BY "dwbID" DESC LIMIT 1'

# Begin a transaction
transaction = engine_staging.begin()

try:
    # Read the latest record into a DataFrame
    df_dw_batch = pd.read_sql_query(sql_line, engine_staging)

    # Check if the latest record matches the dw_batch_id
    print("Latest dw_batch_id from database:", df_dw_batch["dwbID"].iloc[0])
    print(end_time)
    if df_dw_batch["dwbID"].iloc[0] == dw_batch_id:
        connection = engine_staging.connect()
        # Update the end_load_date column with the provided end_time
        update_sql = text(f'UPDATE "dw_batch" SET "end_load_date" = :end_time WHERE "dwbID" = :dw_batch_id')
        # Execute the SQL UPDATE statement with parameters
        connection.execute(update_sql, {'end_time': end_time, 'dw_batch_id': dw_batch_id_int})
        connection.commit()
        connection.close()
        print("end_load_date updated successfully.")
        logging.info("end_load_date updated successfully.")
    else:
        print("dw_batch_id does not match the latest record in the database.")
except SQLAlchemyError as e:
    # If there is an error, rollback the transaction
    transaction.rollback()
    print(f"Error occurred: {e}")
    logging.warning(f"Error occurred: {e}")


In [None]:
# creating a list of table names for staging area 
table_list = ['city',
               'country',
               'province',
               'day_part',
               'day_date',
               'franchisee',
               'product_category',
               'stores',
               'product_group_level1',
               'product_group_level2',
               'product_group_level3',
               'product_group_level4',
               'sales_type',
               'trading_hours',
               'volumn_band',
               'owner_operator',
               'order_type',
               'daily_sales_products']

In [None]:
# loading the data into the data warehouse using the list
for table_name in table_list:
    try:
        connection = engine_staging.connect()
        # Update the end_load_date column with the provided end_time
        update_sql = text(f'UPDATE "{table_name}" SET "dwbID" = :dw_batch_id_int WHERE "dwbID" IS NULL')       
        # Execute the SQL UPDATE statement
        connection.execute(update_sql, {'dw_batch_id_int': dw_batch_id_int})
        connection.commit()
        connection.close()
        print("end_load_date updated successfully for table:", table_name)
        logging.info(f"end_load_date updated successfully for table: {table_name}" )
    except Exception as e:
        print(f"Error loading table '{table_name}': {e}")
        logging.warning(f"Error loading end_load_date in table '{table_name}': {e}")


In [None]:
# creating a list of table names for data warehouse 
table_list2 = ['dim_city',
               'dim_country',
               'dim_province',
               'dim_location',
               'dim_day_part',
               'dim_franchisee',
               'dim_product_category',
               'dim_stores',
               'dim_product_group_level1',
               'dim_product_group_level2',
               'dim_product_group_level3',
               'dim_product_group_level4',
               'dim_sales_type',
               'dim_trading_hours',
               'dim_volumn_band',
               'dim_owner_operator',
               'dim_order_type',
               'fact_daily_sales']

In [None]:
# loading the data into the data warehouse using the list
for table_name in table_list2:
    try:
        connection = engine_DW.connect()
        # Update the end_load_date column with the provided end_time
        update_sql_is_current = text(f'UPDATE "{table_name}" SET "IsCurrent" = True WHERE "IsCurrent" IS NULL')       
        # Execute the SQL UPDATE statement with parameters
        connection.execute(update_sql_is_current)
        connection.commit()
        connection.close()
        print("IsCurrent updated successfully for table:", table_name)
        logging.info(f"IsCurrent updated successfully for table:{table_name}")
    except Exception as e:
        print(f"Error loading table '{table_name}': {e}")
        logging.warning(f"Error loading isCurrent in table '{table_name}': {e}")


In [None]:
# creating a list of table names for data warehouse 
table_list3 = ['dim_city',
               'dim_country',
               'dim_province',
               'dim_location',
               'dim_day_part',
               'dim_franchisee',
               'dim_product_category',
               'dim_stores',
               'dim_product_group_level1',
               'dim_product_group_level2',
               'dim_product_group_level3',
               'dim_product_group_level4',
               'dim_sales_type',
               'dim_trading_hours',
               'dim_volumn_band',
               'dim_owner_operator',
               'dim_order_type',
               'fact_daily_sales']

In [None]:
# loading the data into the data warehouse using the list
connection = engine_DW.connect()

for effective_time_tables in table_list3:
    try:
        update_sql = text(f'UPDATE "{effective_time_tables}" SET "Effective" = :end_time WHERE "Effective" IS NULL')
        connection.execute(update_sql, {'end_time': end_time})
        connection.commit()
        print("Effective updated successfully for table:", effective_time_tables)
        logging.info(f"Effective updated successfully for table:{effective_time_tables}")
    except Exception as e: print(e)
    logging.warning(f"Error loading Effective in table {table_name}")
connection.close()