### Task 1: Data Modeling & ETL

1. Database Setup
Imagine you’re building a dashboard to track orders, customers, and inventory for a food
delivery business.
Create a simplified schema that includes the following tables:
- `Customers`: customer_id, name, registration_date, and region.
- `Orders`: order_id, customer_id, order_date, total_amount.
- `Items`: item_id, item_name, item_price, and inventory_stock.
- `Order_Items`: order_id, item_id, quantity.

Deliverable
- Provide SQL code for creating these tables.
- Write a short paragraph explaining your rationale for the structure

### Answer

The database design is straightforward and simple, with normalization applied to eliminate data redundancy. Primary keys have been assigned to each table to uniquely identify records and ensure data integrity. This design helps maintain data consistency and reduces redundancy across the database. One potential improvement would be to assign a surrogate primary key to the order_items table. By doing so, we could eliminate the need for a composite key (which currently uses order_id and item_id), improving query performance and making the database easier to maintain over time.

In [13]:
sql = """
-- Create Customers table 
CREATE TABLE Customers (
    customer_id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    registration_date DATE,
    region VARCHAR(50)
);

-- Create Orders table 
CREATE TABLE Orders (
    order_id SERIAL PRIMARY KEY,
    customer_id INT,
    order_date DATE,
    total_amount DECIMAL(10, 2),
    FOREIGN KEY (customer_id) REFERENCES Customers(customer_id)
);

-- Create Items table 
CREATE TABLE Items (
    item_id SERIAL PRIMARY KEY,
    item_name VARCHAR(100),
    item_price DECIMAL(10, 2),
    inventory_stock INT
);

-- Create Order_Items table 
CREATE TABLE Order_Items (
    order_id INT,
    item_id INT,
    quantity INT,
    PRIMARY KEY (order_id, item_id),
    FOREIGN KEY (order_id) REFERENCES Orders(order_id),
    FOREIGN KEY (item_id) REFERENCES Items(item_id)
);
"""

In [None]:
import psycopg2
from dotenv import load_dotenv
import os

#Loading environment variables from .env file
load_dotenv()

#SQL query to check existing tables
check_tables_sql = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';"

#Establish connection to Amazon RDS Database
try:
    
    connection = psycopg2.connect(
        host=os.getenv("DB_HOST"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
        port=os.getenv("DB_PORT"),
        dbname=os.getenv("DB_NAME")
    )
    connection.autocommit = True  
    
    cursor = connection.cursor()

    cursor.execute(sql)
    print("Tables created successfully!")

    cursor.execute(check_tables_sql)
    tables = cursor.fetchall()

    print("PostgreSQL database tables:")
    for table in tables:
        print(table[0])

    cursor.close()
    connection.close()

except Exception as e:
    print(f"Error: {e}")


Tables created successfully!
PostgreSQL database tables:
customers
orders
order_items
items


In [None]:
import psycopg2
from faker import Faker
import os
from dotenv import load_dotenv
import random

load_dotenv()

connection = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    dbname=os.getenv("DB_NAME"),
    port=os.getenv("DB_PORT")
)

cursor = connection.cursor()

#Initialize faker for creating fake data
fake = Faker()

num_records = 30

# List of food items to use for item_name
food_items = [
    "Apple", "Banana", "Carrot", "Tomato", "Cucumber", "Chicken Breast", "Lettuce", "Onion", "Rice", "Pasta",
    "Steak", "Salmon", "Spinach", "Potato", "Garlic", "Broccoli", "Mango", "Grapes", "Avocado", "Cheese",
    "Yogurt", "Milk", "Butter", "Eggs", "Peanut Butter", "Honey", "Olive Oil", "Bread", "Cereal", "Tuna",
    "Turkey", "Lamb", "Zucchini", "Pumpkin", "Beef", "Sausage", "Pineapple", "Kiwi", "Peach", "Pomegranate"
]

#Create lists to store valid customer_ids, order_ids, and item_ids
customer_ids = []
order_ids = []
item_ids = []

# Create a set to track (order_id, item_id) combinations to prevent duplicates
order_item_pairs = set()

#Insert data into 'Customers' table
for _ in range(num_records):
    name = fake.name()
    registration_date = fake.date_this_decade()  

    #Random insert NULL for region
    region = fake.city() if random.choice([True, False]) else None

    insert_query = """
    INSERT INTO Customers (name, registration_date, region)
    VALUES (%s, %s, %s) RETURNING customer_id;
    """
    cursor.execute(insert_query, (name, registration_date, region))
    customer_id = cursor.fetchone()[0]  
    customer_ids.append(customer_id)  

#Insert data into 'Items' table
for _ in range(num_records):
    item_name = random.choice(food_items)  
    
    item_price = round(fake.random_number(digits=2), 2)  

    #Random insert NULL for inventory_stock
    inventory_stock = fake.random_int(min=0, max=100) if random.choice([True, False]) else None

    insert_query = """
    INSERT INTO Items (item_name, item_price, inventory_stock)
    VALUES (%s, %s, %s) RETURNING item_id;
    """
    cursor.execute(insert_query, (item_name, item_price, inventory_stock))
    item_id = cursor.fetchone()[0]  
    item_ids.append(item_id)  

#Insert data into 'Orders' table
for _ in range(num_records):
    customer_id = random.choice(customer_ids)  
    order_date = fake.date_this_year() 

    #Random insert NULL for total_amount
    total_amount = round(fake.random_number(digits=2), 2) if random.choice([True, False]) else None

    insert_query = """
    INSERT INTO Orders (customer_id, order_date, total_amount)
    VALUES (%s, %s, %s) RETURNING order_id;
    """
    cursor.execute(insert_query, (customer_id, order_date, total_amount))
    order_id = cursor.fetchone()[0]  
    order_ids.append(order_id)  

#Insert data into 'Order_Items' table 
for _ in range(num_records):
    order_id = random.choice(order_ids)  
    item_id = random.choice(item_ids)  

    while (order_id, item_id) in order_item_pairs:
        order_id = random.choice(order_ids)  
        item_id = random.choice(item_ids)    

    order_item_pairs.add((order_id, item_id))  
    
    quantity = fake.random_int(min=1, max=5)

    insert_query = """
    INSERT INTO Order_Items (order_id, item_id, quantity)
    VALUES (%s, %s, %s);
    """
    cursor.execute(insert_query, (order_id, item_id, quantity))

connection.commit()

cursor.close()
connection.close()

print(f"{num_records} records inserted into all tables successfully.")

30 records inserted into all tables successfully.


### ETL Process

Assume you’re receiving daily files with new orders and customer updates. Write a Python
script that would:
- Load new data from a CSV file into a staging table.
- Update the main tables (`Customers`, `Orders`, `Order_Items`) based on this new data.

Deliverable
- Python code demonstrating the ETL process.
- Brief explanation of your approach to handling updates, deletions, and new entries

### Answer

The script below loads a CSV file into a staging area using a single line of code with the pandas library:

df = pd.read_csv(file_path)

This converts the CSV data into a DataFrame, which is more suitable for efficient data processing and manipulation. The subsequent data processing steps, including extracting relevant columns and inserting data into the database, are fully automated using Python.

Given that the CSV file size is expected to be manageable (less than a few GB), this approach leverages pandas DataFrames to efficiently process the data without the overhead of executing multiple SQL queries. By processing the data entirely in-memory, the script avoids the latency introduced by querying the database multiple times, making it a more efficient solution for moderately sized datasets.


In [None]:
import psycopg2
import pandas as pd
from dotenv import load_dotenv
import os
from io import StringIO

load_dotenv()

connection = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    dbname=os.getenv("DB_NAME"),
    port=os.getenv("DB_PORT")
)

cursor = connection.cursor()

#Function to load CSV into a Pandas DataFrame
def load_csv_to_dataframe(csv_file_path):
    return pd.read_csv(csv_file_path)

#Function to insert data into PostgreSQL using pandas DataFrame
def insert_data_from_df(df, table_name):
    
    buffer = StringIO()
    df.to_csv(buffer, index=False, header=False)
    buffer.seek(0)

    #Insert data into the PostgreSQL table
    cursor.copy_from(buffer, table_name, sep=',')
    connection.commit()

#Function to process and insert data into the respective tables
def process_and_insert_data(csv_file_path):
   
    df = load_csv_to_dataframe(csv_file_path)

    if {'customer_id', 'name', 'registration_date', 'region'}.issubset(df.columns):
        customers_df = df[['customer_id', 'name', 'registration_date', 'region']]
        insert_data_from_df(customers_df, 'customers')

    if {'order_id', 'customer_id', 'order_date', 'total_amount'}.issubset(df.columns):
        orders_df = df[['order_id', 'customer_id', 'order_date', 'total_amount']]
        insert_data_from_df(orders_df, 'orders')
    
    if {'item_id', 'item_name', 'item_price', 'inventory_stock'}.issubset(df.columns):
        items_df = df[['item_id', 'item_name', 'item_price', 'inventory_stock']]
        insert_data_from_df(items_df, 'items')

    if {'order_id', 'item_id', 'quantity'}.issubset(df.columns):
        order_items_df = df[['order_id', 'item_id', 'quantity']]
        insert_data_from_df(order_items_df, 'order_items')

    print("ETL process completed successfully.")

# Define the path to the CSV file
csv_file_path = 'path_to_combined_csv.csv'

# Execute the ETL process
process_and_insert_data(csv_file_path)

# Close the cursor and connection
cursor.close()
connection.close()


### Task 2: Data Analysis and Dashboarding

1. KPIs and Metrics
Based on the schema from Task 1, calculate the following metrics:
- Daily Total Revenue: Total amount of orders per day.
- Average Order Value (AOV): Average order amount over the last 30 days.
- Monthly Active Customers: Count of unique customers who placed an order within the last
30 days.
- Top Selling Items: The 5 most ordered items in the last 30 days.

Deliverable
- SQL queries for each metric.
- Short explanation of each metric’s relevance to the business.

Answer:

For this task, I chose to use SQL queries instead of fetching all the data and using Pandas for the EDA process. If the database were larger, storing the entire dataset in a Pandas DataFrame wouldn't be feasible. While Pandas might be more suitable for a small database like this example, I opted for a more practical approach, using SQL queries for each exercise.

#### Daily Total Revenue:

In [None]:
import psycopg2
import pandas as pd
from dotenv import load_dotenv
import os

load_dotenv()

connection = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    dbname=os.getenv("DB_NAME"),
    port=os.getenv("DB_PORT")
)

cursor = connection.cursor()

query = """
    SELECT
        order_date,
        SUM(total_amount) AS total_revenue
    FROM
        Orders
    GROUP BY
        order_date
    ORDER BY
        order_date;
"""

cursor.execute(query)

#Fetch the result and convert to DataFrame
daily_revenue = pd.DataFrame(cursor.fetchall(), columns=['order_date', 'total_revenue'])

print(daily_revenue)

print(f"\nThe cumulative revenue across all days is: {daily_revenue['total_revenue'].sum()}")

cursor.close()
connection.close()


    order_date total_revenue
0   2024-01-13         96.00
1   2024-01-19         84.00
2   2024-01-28          None
3   2024-02-01         39.00
4   2024-02-03         72.00
5   2024-02-29         91.00
6   2024-03-16         66.00
7   2024-04-26         84.00
8   2024-05-12          None
9   2024-05-14         68.00
10  2024-05-16         85.00
11  2024-05-18         63.00
12  2024-05-27         37.00
13  2024-06-14         84.00
14  2024-06-18          None
15  2024-06-28         63.00
16  2024-08-03         76.00
17  2024-08-13          None
18  2024-08-25         81.00
19  2024-09-09          7.00
20  2024-09-12          None
21  2024-09-18          None
22  2024-10-06          None
23  2024-10-12          None
24  2024-10-13         71.00
25  2024-10-16          None
26  2024-10-28          None
27  2024-11-12         40.00

The cumulative revenue across all days is: 1207.00


#### Average Order Value (AOV):

Answer: 

This took me a few extra queries to visualize the data. Once I realized that the total_amount column couldn't be used due to many NaN values, I decided to apply some feature engineering and use a combination of columns (quantity + item_price) instead.

In [None]:
import psycopg2
import pandas as pd
from dotenv import load_dotenv
import os

load_dotenv()

connection = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    dbname=os.getenv("DB_NAME"),
    port=os.getenv("DB_PORT")
)

cursor = connection.cursor()

#SQL query for viewing order_date
query = """
SELECT order_date FROM Orders LIMIT 10;

"""

#Execute the query
cursor.execute(query)

#Fetch the results
result = cursor.fetchall()

print(result)

#Close the cursor and connection
cursor.close()
connection.close()

[(datetime.date(2024, 10, 16),), (datetime.date(2024, 6, 14),), (datetime.date(2024, 10, 13),), (datetime.date(2024, 5, 16),), (datetime.date(2024, 2, 29),), (datetime.date(2024, 9, 18),), (datetime.date(2024, 4, 26),), (datetime.date(2024, 10, 12),), (datetime.date(2024, 8, 13),), (datetime.date(2024, 9, 12),)]


In [None]:
connection = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    dbname=os.getenv("DB_NAME"),
    port=os.getenv("DB_PORT")
)

cursor = connection.cursor()

#SQL query for viewing total amount
query = """
SELECT total_amount FROM Orders LIMIT 10;
"""

cursor.execute(query)

result = cursor.fetchall()

print(result)

cursor.close()
connection.close()

[(None,), (Decimal('84.00'),), (Decimal('71.00'),), (Decimal('85.00'),), (Decimal('91.00'),), (None,), (Decimal('84.00'),), (None,), (None,), (None,)]


In [None]:
connection = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    dbname=os.getenv("DB_NAME"),
    port=os.getenv("DB_PORT")
)

cursor = connection.cursor()

#SQL query for calculating Average Order Value over the last 30 days and also Average Order Count
query = """
SELECT 
    AVG(order_totals.order_total) AS average_order_value,
    COUNT(o.order_id) / 30.0 AS average_daily_order_count
FROM (
    -- Subquery to calculate total value of each order
    SELECT 
        o.order_id,
        SUM(oi.quantity * i.item_price) AS order_total
    FROM Orders o
    INNER JOIN Order_Items oi ON o.order_id = oi.order_id
    INNER JOIN Items i ON oi.item_id = i.item_id
    WHERE o.order_date >= CURRENT_DATE - INTERVAL '30 days'
    GROUP BY o.order_id
) AS order_totals
INNER JOIN Orders o ON o.order_id = order_totals.order_id
WHERE o.order_date >= CURRENT_DATE - INTERVAL '30 days';
"""

cursor.execute(query)

result = cursor.fetchall()

#Assign each value to respective variable for printing.
average_order_value, average_daily_order_count = result[0]

print(f"Average Order Value (AOV) over the last 30 days: {float(average_order_value)}, and the average order count over the last 30 days is: {average_daily_order_count:.3f}")

cursor.close()
connection.close()

Average Order Value (AOV) over the last 30 days: 351.5, and the average order count over the last 30 days is: 0.067


#### Monthly Active Customers:

Answer:

This could be achieved by using count distinct on the customer_id and a where clause for the last 30 days interval.

In [None]:
connection = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    dbname=os.getenv("DB_NAME"),
    port=os.getenv("DB_PORT")
)

cursor = connection.cursor()

#SQL query for calculating Montly Active Customers
query = """
SELECT 
    COUNT(DISTINCT o.customer_id) AS monthly_active_customers
FROM Orders o
WHERE o.order_date >= CURRENT_DATE - INTERVAL '30 days';
"""

cursor.execute(query)

result = cursor.fetchall()

print(f"Montly Active Customers over the last 30 days: {result[0][0]}")

cursor.close()
connection.close()

Montly Active Customers over the last 30 days: 3


#### Top Selling Items: The 5 most ordered items in the last 30 days

Answer: 

It seems that even by having several item names in my database, only 3 popped up for being ordered in the last 30 days. The sql query for this uses 2 inner joins to access the date from the orders table, quantity from the order_items table and item_name from the items table. The results are then converted into a pandas dataframe for better visualization of the table. 

In [18]:
import pandas as pd

connection = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    dbname=os.getenv("DB_NAME"),
    port=os.getenv("DB_PORT")
)

cursor = connection.cursor()

#SQL query for calculating Top Selling Items
query = """
SELECT 
    i.item_name,
    SUM(oi.quantity) AS total_quantity_sold
FROM Order_Items oi
INNER JOIN Items i ON oi.item_id = i.item_id
INNER JOIN Orders o ON oi.order_id = o.order_id
WHERE o.order_date >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY i.item_name
ORDER BY total_quantity_sold DESC
LIMIT 5;
"""

cursor.execute(query)

result = cursor.fetchall()

#Fetching column names from the cursor description
columns = [desc[0] for desc in cursor.description]

# Converting to a DataFrame
df = pd.DataFrame(result, columns=columns)

print(df)

cursor.close()
connection.close()


  item_name  total_quantity_sold
0     Peach                    5
1      Eggs                    3
2     Mango                    2


### Task 3: Data Quality and Troubleshooting

1. Data Quality Checks
Implement SQL queries to perform the following data quality checks:
- Check for any `NULL` values in critical columns (e.g., `order_id`, `customer_id`,
`total_amount`).
- Identify orders with unusually high or low `total_amount` values.
- Ensure all `order_id` in `Order_Items` exist in the `Orders` table (foreign key integrity).

Deliverable
- SQL queries for each data quality check.
- Brief explanation of how you would automate these checks for daily monitoring.

Answer:

First I will check all NULL values from critical columns in the orders table. Then I will display all in a single pandas dataframe. 

In [23]:
connection = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    dbname=os.getenv("DB_NAME"),
    port=os.getenv("DB_PORT")
)

cursor = connection.cursor()

#SQL query for checking for NULL values in critical columns in Orders table
query = """ 
SELECT order_id, customer_id, total_amount
FROM Orders
WHERE order_id IS NULL
   OR customer_id IS NULL
   OR total_amount IS NULL;
"""

cursor.execute(query)

result = cursor.fetchall()

#Fetching column names from the cursor description
columns = [desc[0] for desc in cursor.description]

# Converting to a DataFrame
df = pd.DataFrame(result, columns=columns)

print(df)

cursor.close()
connection.close()

    order_id  customer_id total_amount
0        182          225         None
1        187          240         None
2        189          227         None
3        190          222         None
4        191          236         None
5        192          217         None
6        194          229         None
7        198          217         None
8        199          237         None
9        204          239         None
10       208          237         None
11       211          213         None


For identifying unusual values, I first computed the total_amount by summing the quantity and item_price columns, as the total_amount column contains many NULL values. Then, I calculated the average (avg_total) and standard deviation (stddev_total) of the computed total_amount. Any orders with a total_amount more than 2 standard deviations away from the average were flagged as unusually high or low. These flagged results were then displayed in the dataframe.

In [26]:
connection = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    dbname=os.getenv("DB_NAME"),
    port=os.getenv("DB_PORT")
)

cursor = connection.cursor()

#SQL query for checking for NULL values in critical columns in Orders table
query = """ 
WITH OrderTotal AS (
    SELECT 
        o.order_id,
        o.customer_id,
        SUM(oi.quantity * i.item_price) AS expected_total_amount
    FROM Orders o
    JOIN Order_Items oi ON o.order_id = oi.order_id
    JOIN Items i ON oi.item_id = i.item_id
    GROUP BY o.order_id, o.customer_id
),
Stats AS (
    SELECT 
        AVG(expected_total_amount) AS avg_total,
        STDDEV(expected_total_amount) AS stddev_total
    FROM OrderTotal
)
SELECT 
    ot.order_id,
    ot.customer_id,
    ot.expected_total_amount,
    s.avg_total,
    s.stddev_total
FROM OrderTotal ot, Stats s
WHERE ot.expected_total_amount > s.avg_total + 2 * s.stddev_total
   OR ot.expected_total_amount < s.avg_total - 2 * s.stddev_total;

"""

cursor.execute(query)

result = cursor.fetchall()

#Fetching column names from the cursor description
columns = [desc[0] for desc in cursor.description]

# Converting to a DataFrame
df = pd.DataFrame(result, columns=columns)

print(df)

cursor.close()
connection.close()



   order_id  customer_id expected_total_amount             avg_total  \
0       198          217                712.00  259.3684210526315789   

       stddev_total  
0  210.305389196630  


Finally we ensure all `order_id` in `Order_Items` exist in the `Orders` table (foreign key integrity) with a simple query which will return any order_id that doesnt have a correcponing entity in the orders table, ensuring integrity.

In [None]:
connection = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    dbname=os.getenv("DB_NAME"),
    port=os.getenv("DB_PORT")
)

cursor = connection.cursor()

#SQL query for checking for NULL values in critical columns in Orders table
query = """ 
SELECT oi.order_id
FROM Order_Items oi
LEFT JOIN Orders o ON oi.order_id = o.order_id
WHERE o.order_id IS NULL;

"""

cursor.execute(query)

result = cursor.fetchall()

print(result)

cursor.close()
connection.close()

[]


### 2. Troubleshooting Data Load Issues
Imagine a scenario where daily data loads are failing, and the new orders are not appearing
on the dashboard. Outline a step-by-step approach for diagnosing and resolving this issue.
Consider checkpoints in the ETL pipeline, database issues, and dashboard updates.

Deliverable
- Write-up detailing your troubleshooting process and key checkpoints.

Answer: 

To troubleshoot the missing daily data loads, I’d start by getting a clear picture of what’s going wrong. First, I’d check the ETL (Extract, Transform, Load) process and look through the logs for any recent changes, errors, or disruptions. I’d look for issues like changes to the ETL jobs, faulty filters, missing transformations, or failed job executions that could have caused the problem.

Next, I’d make sure the data being loaded is correct and complete. This means checking for any issues like NULL values, wrong data types, or strange special characters. If those checks aren’t already set up, I’d add filters to catch these types of errors early on in the process.

I’d also double-check that the database connection is set up properly. I want to make sure the pipeline is pointing to the right database and that there are no connection issues. Duplicate data or slow loading times are also important to look at, as they can point to problems with the ETL process or data transformations.

The database schema is another important area to check. If there have been any changes to the schema, like renamed columns or mismatched data types, this could break the data load. I’d confirm that the schema still matches what the ETL pipeline expects, and ensure everything is properly normalized.

Finally, I’d review the dashboard itself to make sure data is being displayed correctly. I’d check if there have been any changes to the data source connection, visualizations, or filters that might be causing new data to be left out.

To make future troubleshooting easier, I’d create a checklist of potential issues. This would help me quickly diagnose similar problems down the road and ensure that troubleshooting steps are well-documented for consistency.