## Importing Libraries and Load Environment Variables:

I first start by importing essential libraries such as os, pandas, dotenv, psycopg2, and openpyxl. These are crucial for my data manipulation, database interactions, and handling Excel files. I use load_dotenv to ensure my environment variables from the .env file are up to date.

In [13]:
%pip install sqlalchemy


Note: you may need to restart the kernel to use updated packages.


In [1]:

import os #used to set my environment variables
import pandas as pd # data analysis and manipulation library
from dotenv import load_dotenv #  used to load environment variables from a .env file into my Python environment.
from sqlalchemy import create_engine
import psycopg2 # database adapter library that enables Python applications interact with PostgreSQL databases
import openpyxl # Python library used for reading from and writing to Excel 2010+ files
from datetime import datetime #important when working with dates

load_dotenv(override=True) #this means that if we change the values of our .env file. it picks the new value and we dont have to reload it.

pg_url = os.getenv('POSTGRES_URL') #  retrieves the value of my database url

type(pg_url) 

str

## In the beginning....

After importing the necessary libraries , the next logical step would be to explore ad understand the data I am working with. To do this , I need to read the excel file using pandas library which is shown below.

In [3]:
file_path = '/Users/tamarainwang/Downloads/Week_6/hands_on_proj/global-superstore-data.xlsx'

df = pd.read_excel(file_path)


### Exploring the dataframe 

In [4]:
df.head()

Unnamed: 0,Row ID,Order ID,Order Date,Ship Date,Ship Mode,Customer ID,Customer Name,Segment,City,State,...,Product ID,Category,Sub-Category,Product Name,Sales,Quantity,Discount,Profit,Shipping Cost,Order Priority
0,32298,CA-2012-124891,2012-07-31,2012-07-31,Same Day,RH-19495,Rick Hansen,Consumer,New York City,New York,...,TEC-AC-10003033,Technology,Accessories,Plantronics CS510 - Over-the-Head monaural Wir...,2309.65,7,0.0,762.1845,933.57,Critical
1,26341,IN-2013-77878,2013-02-05,2013-02-07,Second Class,JR-16210,Justin Ritter,Corporate,Wollongong,New South Wales,...,FUR-CH-10003950,Furniture,Chairs,"Novimex Executive Leather Armchair, Black",3709.395,9,0.1,-288.765,923.63,Critical
2,25330,IN-2013-71249,2013-10-17,2013-10-18,First Class,CR-12730,Craig Reiter,Consumer,Brisbane,Queensland,...,TEC-PH-10004664,Technology,Phones,"Nokia Smart Phone, with Caller ID",5175.171,9,0.1,919.971,915.49,Medium
3,13524,ES-2013-1579342,2013-01-28,2013-01-30,First Class,KM-16375,Katherine Murray,Home Office,Berlin,Berlin,...,TEC-PH-10004583,Technology,Phones,"Motorola Smart Phone, Cordless",2892.51,5,0.1,-96.54,910.16,Medium
4,47221,SG-2013-4320,2013-11-05,2013-11-06,Same Day,RH-9495,Rick Hansen,Consumer,Dakar,Dakar,...,TEC-SHA-10000501,Technology,Copiers,"Sharp Wireless Fax, High-Speed",2832.96,8,0.0,311.52,903.04,Critical


In [5]:
df.columns

#the column names need to be properly formatted. There are spaces and hyphens in the columns names.

Index(['Row ID', 'Order ID', 'Order Date', 'Ship Date', 'Ship Mode',
       'Customer ID', 'Customer Name', 'Segment', 'City', 'State', 'Country',
       'Postal Code', 'Market', 'Region', 'Product ID', 'Category',
       'Sub-Category', 'Product Name', 'Sales', 'Quantity', 'Discount',
       'Profit', 'Shipping Cost', 'Order Priority'],
      dtype='object')

## Loading data from the Excel file into a DataFrame

Now that I have explored the data and seen that the column names need to be properly formatted(there are spaces and hyphens in the columns names). My work can begin. The first step would be be write a function that reads my global-superstore-data.xlsx (i.e excel file) into a pandas dataframe. The use of functions will help me write the logic once and then call the function whenever I need to perform that task in the defined function, rather than rewriting the same code again and again.

In [7]:
def load_excel_data(data_source: str, sheet_name: str = None):
    """This function reads an Excel file from the specified path (data_source) and loads data into a Pandas DataFrame. 
    If a sheet name is provided, it loads data from that specific sheet. 
    If no sheet name is specified, it loads data from the first sheet by default"""
    print(f'Loading data from {data_source}...')
    if sheet_name:
        raw_data = pd.read_excel(data_source, sheet_name=sheet_name)
    else:
        raw_data = pd.read_excel(data_source)
    return raw_data

## Data Transformation

Next, I defined the function "transform_column_names" to format my dataFrame column names,removing leading or trailing spaces and making them lowercase . Then I replaced spaces and hyphens with underscores. This standardized the column names for database compatibility.


In [8]:
# fxn to transform column names

def transform_column_names(df:pd.DataFrame)-> pd.DataFrame:
    """iterates over each column name in the DataFrame's columns and transforms it to lowercase and replaces spaces with underscores"""
    df.columns = [column.strip().lower().replace(' ','_').replace('-','_') for column in df.columns]
    return df

I used the transform_column_names function to iterates over & each column name in the dataframe (df) and then displayed all the columns to view confirm the transformation. The result is a transformed and standardised dataframe.

In [9]:
raw_data_transformed = transform_column_names(df)

raw_data_transformed.columns


Index(['row_id', 'order_id', 'order_date', 'ship_date', 'ship_mode',
       'customer_id', 'customer_name', 'segment', 'city', 'state', 'country',
       'postal_code', 'market', 'region', 'product_id', 'category',
       'sub_category', 'product_name', 'sales', 'quantity', 'discount',
       'profit', 'shipping_cost', 'order_priority'],
      dtype='object')

## Data Normalization


The next step is to break the main dataframe (raw_data_transformed) into four distinct dataFrames i.e products, customers, locations, and orders. I defined  the normalize_data function to do this. This function breaks down the main dataFrame into four distinct subsets (also dataframes) and removes duplicates from these dataframes. I also included print statements to track the progress of the normalization process.

In [20]:
# Normalization ----> Breaking it all apart. Breaking the table into the 4 subsets below,chose the primary key in each subset and then remove exact duplicates 

# Products
# customers
# location
# orders

def normalize_data(df):
    """Function to normalize data into 4 dataframes ie 4 tables"""
    print(f"Normalization in progress.....current length = {len(df)}")

    products_df = df[['product_id', 'category', 'sub_category', 'product_name']]
    products_df_clean = products_df.drop_duplicates()
    print(f"Done transforming Products ---> current length = {len(products_df_clean)}")

    customers_df = df[['customer_id', 'customer_name']]
    customers_df_clean = customers_df.drop_duplicates()
    print(f"Done transforming customers ---> current length = {len(customers_df_clean)}")

    locations_df = df[['city', 'state', 'country','market',]]
    locations_df_clean = locations_df.drop_duplicates()
    print(f"Done transforming locations ---> current length = {len(locations_df_clean)}")
      
    # The next thing to do would be to create my orders_df and remove exact duplicates based on a primary key i.e order_id
    # However I noticed that the order_id would not make a suitable primary key for the orders df because the order id was not unique.
    # This was because each order had multiple products per order causing a duplication inthe order_id
    # then to drop duplicates in the orders df based on the row_id which is now the  primary key 

    orders_df = raw_data_transformed.drop(columns = ['category', 'sub_category', 'product_name','customer_name','state', 'city'])

    print(f'Length of orders_df: {len(orders_df)}')

    orders_df_clean = orders_df.drop_duplicates(subset = ['row_id'])

    print(len(orders_df_clean))

    print(f"Done transforming orders ---> current length = {len(orders_df_clean)}")

    return[products_df_clean,customers_df_clean,locations_df_clean,orders_df]



## Writing Data to CSV Files

I then wrote my transformed dataFrames into CSV files in the outputs/models directory (an intermediate storage) and added a quick print statement to confirm that the I've successfully saved the csv files to the directory. 

In [11]:
def write_to_csv(list_of_dfs:list):
    """Function to write a list of dataframes to a csv file."""

    #write dfs to csv files

    output_dir = "outputs/models/"
    list_of_dfs[0].to_csv(f"{output_dir}products.csv", index=False)
    list_of_dfs[1].to_csv(f"{output_dir}customers.csv", index=False)
    list_of_dfs[2].to_csv(f"{output_dir}locations.csv", index=False)
    list_of_dfs[3].to_csv(f"{output_dir}orders.csv", index=False)

    print(f"{len(list_of_dfs)} Files written to intermediate storage.")

    print("Files written to intermediate storage")


## Loading CSV files to elephantsql database while adding a column showing when the data was uploaded to the elephantSQL database

Now that i have my normalised subsets in my intermediate storage (i.e "outputs/models/"). The next step would be to load the normalised subsets to my final storage (ElephantSQL database in this instance).However I need to add a crucial column to my tables. This column is a timestamp column called "_elt_loaded_at" which indicates time data was added to the database. The logic after this would be to immediately load the timestamped dataframes to the database.

To achieve this, I defined 2 functions.The first to modify my timestamped table in my intermediate storage and the 2nd reads a CSV from 'outputs/models/{table}.csv', adds a timestamp column(the first function that adds a timestamp column is called here ), and writes the data to the specified database table. If the table exists, it is replaced. This function uses the 'POSTGRES_URL' from environment variables to connect to the database.
The 2nd function reads 

1st function

In [23]:
def update_dataframe(original_data:pd.DataFrame)-> pd.DataFrame:
    """This function mofifies our exisiting dataframe by 
    adding a timestamp column to return a new dataframe."""
    # this gets the current time using datetime module imported above ,formats it and is stored as current time
    current_time = datetime.now().strftime('%Y-%d-%m %H:%M:%S') 
    # this adds a new column to original_data dataframe called "_elt_loaded_at" & every row in this new column is filled with the value of current_time.
    original_data['_elt_loaded_at'] = current_time
    return original_data

2nd function

In [31]:
def load_data_to_db(table:str):
    """
    Loads data from a CSV file into a specified table in a database, with an added timestamp column.

    This function performs the following steps:
    1. Reads a CSV file corresponding to the given table name from the 'outputs/models' directory into a DataFrame.
    2. Adds a timestamp column to this DataFrame, marking the time at which the data was processed.
    3. Connects to a database using the URL specified in the 'POSTGRES_URL' environment variable.
    4. Writes the DataFrame to the specified table in the database. If the table already exists, it is replaced.
    5. Closes the database connection and prints a confirmation message.

    Parameters:
    table (str): The name of the table, which is also used to identify the CSV file to be loaded (assumed to be 'table.csv' in 'outputs/models').

    Note:
    The function requires 'pd', 'create_engine' from 'sqlalchemy', and 'datetime' to be imported and 'pg_url' to be defined prior to its call.
    The function assumes that the environment variable 'POSTGRES_URL' is set and accessible.
    """
  
    df_without_timestamp = pd.read_csv(f"outputs/models/{table}.csv")
 
    timestamped_dataframe = update_dataframe(original_data = df_without_timestamp)
    engine = create_engine(pg_url)
    connection = engine.connect()
    timestamped_dataframe.to_sql(table, con=connection, if_exists='replace', index=False)
    connection.close()
    print(f"Data loaded to {table} table.")

# It is important to note that the timestamp is added to the DataFrame just before it's written to the database. In an ideal and efficient environment, this should minimize the delay between the timestamp generation and the timestamped dataframes loading into the database.
    
# However,there are a few scenarios where a delay might still occur due to large volumes of data, database server's load, network latency,database server processing etc. In these instances database-level timestamping is ideal.
 

## Bringing it together ...An Excel to Database ETL Pipeline

In [2]:
if __name__ =="__main__":

    #load data
    raw_data = load_excel_data(data_source=file_path,sheet_name="Orders")
    raw_data_transformed = transform_column_names(raw_data)

    #normalize data
    normalized_datasets_list = normalize_data(df=raw_data_transformed)

    #write_to_csv
    write_to_csv(normalized_datasets_list)

    #load data to db
    files_to_load = ["products","customers","locations","orders"]
    for file in files_to_load:
        load_data_to_db(table=file)
    
    

## The Analytics tables 

4 tables where loaded into our database (kindly refer to snapshots_of_table_in_db ). However the company requires insights into their best selling items, clients and locations.

This is achieved by creating 2 analytics tables

Top 100 products
Top 20 customers
Top 3 markets

