# Project: Retail Store Data Pipeline & Analysis

In this notebook we will build an end-to-end data pipeline including **data loading, cleaning, transformation, and SQL analysis**.

---

## Table of Contents

1. [Introduction](#intro)  
2. [Data Loading](#loading)  
3. [Data Wrangling](#wrangling)  
4. [Data Transformation](#transformation)
5. [SQL Loading](#sql)  
6. [Summary](#summary) 

---

<a id='intro'></a>
## Introduction

### Dataset Description

We will work with **retail store data from 2016-2018**. This dataset contains **9 CSV files** with information about products, customers, orders, and inventory.

The datasets include:

1. **Brands**: Brand information (`brand_id`, `brand_name`)  
2. **Categories**: Product categories (`category_id`, `category_name`)  
3. **Products**: Product catalog with prices (`product_id`, `product_name`, `brand_id`, `category_id`, `model_year`, `list_price`)  
4. **Customers**: Customer information with contact details (`customer_id`, `first_name`, `last_name`, `phone`, `email`, `street`, `city`, `state`, `zip_code`)    
5. **Orders**: Order headers (`order_id`, `customer_id`, `order_status`, `order_date`, `required_date`, `shipped_date`, `store_id`, `staff_id`)  
6. **Order Items**: Order line items (`order_id`, `item_id`, `product_id`, `quantity`, `list_price`, `discount`)  
7. **Staffs**: Store employees (`staff_id`, `first_name`, `last_name`, `email`, `phone`, `active`, `store_id`, `manager_id`)  
8. **Stores**: Store locations (`store_id`, `store_name`, `phone`, `email`, `street`, `city`, `state`, `zip_code`)  
9. **Stocks**: Inventory levels (`store_id`, `product_id`, `quantity`)  


<a id='loading'></a>
# 1- import packages and load data

In [None]:
import pandas as pd
import numpy as np

brands_df = pd.read_csv(r'D:\Downloads\Project\Data_Sources\brands.csv')
categories_df = pd.read_csv(r'D:\Downloads\Project\Data_Sources\categories.csv')
products_df = pd.read_csv(r'D:\Downloads\Project\Data_Sources\products.csv')
customers_df = pd.read_csv(r'D:\Downloads\Project\Data_Sources\customers.csv')
orders_df = pd.read_csv(r'D:\Downloads\Project\Data_Sources\orders.csv')
order_items_df = pd.read_csv(r'D:\Downloads\Project\Data_Sources\order_items.csv')
staffs_df = pd.read_csv(r'D:\Downloads\Project\Data_Sources\staffs.csv')
stores_df = pd.read_csv(r'D:\Downloads\Project\Data_Sources\stores.csv')
stocks_df = pd.read_csv(r'D:\Downloads\Project\Data_Sources\stocks.csv')

<a id='wrangling'></a>
## Data Wrangling



### General Properties

# 2- show some general properties for the data

In [None]:
#brands_df.head()
#categories_df.head()
#products_df.head()
#customers_df.head(15)
#orders_df.head(70)
#order_items_df.head()
#staffs_df.head(15)
#stores_df.head()
#stocks_df.head()

In [None]:
print("DATASET SHAPES:")
print(f"Brands: {brands_df.shape}")
print(f"Categories: {categories_df.shape}")
print(f"Products: {products_df.shape}")
print(f"Customers: {customers_df.shape}")
print(f"Orders: {orders_df.shape}")
print(f"Order Items: {order_items_df.shape}")
print(f"Staffs: {staffs_df.shape}")
print(f"Stores: {stores_df.shape}")
print(f"Stocks: {stocks_df.shape}")

In [None]:
print("\n DATA TYPES:")
print("\n Brands columns:")
print(brands_df.dtypes)
print("\n Categories columns:")
print(categories_df.dtypes)
print("\n Products columns:")
print(products_df.dtypes)
print("\n Customers columns:")
print(customers_df.dtypes)
print("\n Orders columns:")
print(orders_df.dtypes)
print("\n Order Items columns:")
print(order_items_df.dtypes)
print("\n Staffs columns:")
print(staffs_df.dtypes)
print("\n Stores columns:")
print(stores_df.dtypes)
print("\n Stocks columns:")
print(stocks_df.dtypes)

In [None]:
print("\n MISSING VALUES:")

print("\n Brands:")
print(brands_df.isna().sum())

print("\n Categories:")
print(categories_df.isna().sum())

print("\n Products:")
print(products_df.isna().sum())

print("\n Customers:")
print(customers_df.isna().sum())

print("\n Orders:")
print(orders_df.isna().sum())

print("\n Order Items:")
print(order_items_df.isna().sum())

print("\n Staffs:")
print(staffs_df.isna().sum())

print("\nStores:")
print(stores_df.isna().sum())

print("\n Stocks:")
print(stocks_df.isna().sum())

In [None]:
print("DATASET DUPLICATES:")
print(f"Brands: {brands_df.duplicated().sum()}")
print(f"Categories: {categories_df.duplicated().sum()}")
print(f"Products: {products_df.duplicated().sum()}")
print(f"Customers: {customers_df.duplicated().sum()}")
print(f"Orders: {orders_df.duplicated().sum()}")
print(f"Order Items: {order_items_df.duplicated().sum()}")
print(f"Staffs: {staffs_df.duplicated().sum()}")
print(f"Stores: {stores_df.duplicated().sum()}")
print(f"Stocks: {stocks_df.duplicated().sum()}")


# 3- Summary of results from previous functions

In [None]:
print("""
Cleaned DataFrames:

- Brand DataFrame
- Categories DataFrame


DataFrames needing cleaning:

1- Products DataFrame
- DUPLICATES: remove

2- Customers DataFrame
- MISSING VALUES: 'phone' have NaNs
- FORMAT ISSUES: Phone numbers may have multiple values, spaces, special characters
- MISSING CALCULATED COLUMN: Need 'full_name'

3- Orders DataFrame
- MISSING VALUES: 'shipped_date' have NaNs
- WRONG DATA TYPES: order_date ,required_date ,shipped_date ,stored as text instead of datetime

4- Order Items DataFrame
- DUPLICATES: remove
- WRONG RANGE OF VALUES: Quantities may be negative
 In the transformation:
 - MISSING CALCULATED COLUMN: Need 'total_price'

5- Staffs DataFrame
- MISSING VALUES: 'phone','email','last_name ,'store_id','manager_id' have NaNs
- drop 'store_id','manager_id' (Reason: these columns contain too many missing values and are not useful for analysis)
- FORMAT ISSUES: Phone numbers may have multiple values, spaces, special characters

6- Stores DataFrame
- MISSING VALUES: 'zip_code','email' have NaNs
- WRONG DATA TYPES: 'zip_code' stored as float instead of string (to allow filling with 'Unknown')
- FORMAT ISSUES: Phone numbers may have multiple values, spaces, special characters

7- Stocks DataFrame
- WRONG RANGE OF VALUES: Quantities may be negative
""")


# cleaning


### 4- Standardize Column Names
> Tip: Convert all column names to lowercase and replace spaces with underscores

In [None]:
brands_df.columns = brands_df.columns.str.lower().str.replace(' ', '_')
categories_df.columns = categories_df.columns.str.lower().str.replace(' ', '_')
products_df.columns = products_df.columns.str.lower().str.replace(' ', '_')
customers_df.columns = customers_df.columns.str.lower().str.replace(' ', '_')
orders_df.columns = orders_df.columns.str.lower().str.replace(' ', '_')
order_items_df.columns = order_items_df.columns.str.lower().str.replace(' ', '_')
staffs_df.columns = staffs_df.columns.str.lower().str.replace(' ', '_')
stores_df.columns = stores_df.columns.str.lower().str.replace(' ', '_')
stocks_df.columns = stocks_df.columns.str.lower().str.replace(' ', '_')

### 5 - clean  ` products ` Table
> Tip: Remove duplicates



In [None]:
products_df = products_df.drop_duplicates()

### 6 - clean ` Customers ` Table
> Tip: Handle missing phone , clean phone numbers

In [None]:
customers_df['phone'] = customers_df['phone'].fillna('Unknown')

In [None]:
def clean_phone(phone):
  if phone == 'Unknown':
        return phone
  if ',' in phone:
        phone = phone.split(',')[0]
  phone = ''.join(filter(str.isdigit, phone))
  return phone

customers_df['phone'] = customers_df['phone'].apply(clean_phone)

In [None]:
customers_df.head()

### 7 - clean ` Orders ` Table
> Tip: Convert dates to datetime ,
After converting date columns to datetime -->Drop rows where order_date is missing (because the order cannot exist without a valid order date) , 
After converting date columns to datetime --> missing values in required_date were left as NULL to avoid introducing incorrect assumptions, Handle missing shipped_date

In [None]:
orders_df['order_date'] = pd.to_datetime(orders_df['order_date'], errors='coerce')
orders_df['required_date'] = pd.to_datetime(orders_df['required_date'], errors='coerce')
orders_df['shipped_date'] = pd.to_datetime(orders_df['shipped_date'], errors='coerce')

In [None]:
orders_df = orders_df.dropna(subset=['order_date'])

In [None]:
orders_df['shipped_date'] = orders_df['shipped_date'].fillna(orders_df['required_date'])

In [None]:
orders_df.head()

### 8 - clean `  Order Items ` Table
> Tip: Remove duplicates , Remove negative quantities

In [None]:
order_items_df = order_items_df.drop_duplicates()

In [None]:
order_items_df = order_items_df[order_items_df['quantity'] > 0]

In [None]:
order_items_df.head()   

### 9 - clean `   Staffs ` Table
> Tip:  Handle missing phone and email and last_name , dropna store_id and manager_id , clean phone numbers

In [None]:
staffs_df['phone'] = staffs_df['phone'].fillna('Unknown')
staffs_df['email'] = staffs_df['email'].fillna('unknown@email.com')
staffs_df['last_name'] = staffs_df['last_name'].fillna('Unknown')

In [None]:
staffs_df['phone'] = staffs_df['phone'].apply(clean_phone)

In [None]:
staffs_df.head()

### 10 - clean `  Stores ` Table
> Tip:  Handle missing email and phone , Convert zip_code to string and Handle missing , clean phone numbers

In [None]:
stores_df['email'] = stores_df['email'].fillna('unknown@email.com')
stores_df['phone'] = stores_df['phone'].fillna('Unknown')

In [None]:
stores_df['zip_code'] = stores_df['zip_code'].astype('string')
stores_df['zip_code'] = stores_df['zip_code'].fillna("Unknown")

In [None]:
stores_df['phone'] = stores_df['phone'].apply(clean_phone)

In [None]:
stores_df.head()

### 11 - clean `  Stocks ` Table
> Tip: Remove negative quantities

In [None]:
stocks_df = stocks_df[stocks_df['quantity'] > 0]

In [None]:
stocks_df.head()

<a id='transformation'></a>
## Data Transformation

### 12 - Merge products with brands and categories



we will merge the `products_df` with `brands_df` and `categories_df`  
to have a single DataFrame containing all product information along with the brand name and category name

In [None]:
products_merged = products_df.merge(brands_df, on='brand_id', how='left')
products_merged = products_merged.merge(categories_df, on='category_id', how='left')

In [None]:
products_merged.head()

### 13 - Calculate total_price for Order Items
> Tip: total_price = quantity * list_price

In [None]:
order_items_df['total_price'] = order_items_df['quantity'] * order_items_df['list_price']

In [None]:
order_items_df.head()

### 14- Calculate Order Total Amount
> Tip: Group by order_id and sum total_price

In [None]:
order_totals =(order_items_df
               .groupby('order_id')
               ['total_price'].sum()
               .reset_index()
               )
order_totals.columns = ['order_id', 'order_total_amount']

In [None]:
order_totals

### 15 - create  customer full_name

In [None]:
customers_df['full_name'] = customers_df['first_name'] + ' ' + customers_df['last_name']

In [None]:
customers_df.head()

### 16 - Save Cleaned Data


In [None]:
brands_df.to_csv(r'D:\Downloads\Project\Cleaned_datasets\cleaned_brands.csv', index=False)
categories_df.to_csv(r'D:\Downloads\Project\Cleaned_datasets\cleaned_categories.csv', index=False)
products_df.to_csv(r'D:\Downloads\Project\Cleaned_datasets\cleaned_products.csv', index=False)
customers_df.to_csv(r'D:\Downloads\Project\Cleaned_datasets\cleaned_customers.csv', index=False)
orders_df.to_csv(r'D:\Downloads\Project\Cleaned_datasets\cleaned_orders.csv', index=False)
order_items_df.to_csv(r'D:\Downloads\Project\Cleaned_datasets\cleaned_order_items.csv', index=False)
staffs_df.to_csv(r'D:\Downloads\Project\Cleaned_datasets\cleaned_staffs.csv', index=False)
stores_df.to_csv(r'D:\Downloads\Project\Cleaned_datasets\cleaned_stores.csv', index=False)
stocks_df.to_csv(r'D:\Downloads\Project\Cleaned_datasets\cleaned_stocks.csv', index=False)

### 17 - Final Check


> After finishing cleaning and loading, make sure:
   *   There is no wrong data type
   *   There is no NaN values (or handled appropriately)
   *   All columns are clean and ready for analysis

In [None]:

print("   FINAL CHECK   ")

print("\n Dataset Shapes:")
print(f"Brands: {brands_df.shape}")
print(f"Categories: {categories_df.shape}")
print(f"Products: {products_df.shape}")
print(f"Customers: {customers_df.shape}")
print(f"Orders: {orders_df.shape}")
print(f"Order Items: {order_items_df.shape}")
print(f"Staffs: {staffs_df.shape}")
print(f"Stores: {stores_df.shape}")
print(f"Stocks: {stocks_df.shape}")

print("\n Total Missing Values:")
print(f"Brands: {brands_df.isna().sum().sum()}")
print(f"Categories: {categories_df.isna().sum().sum()}")
print(f"Products: {products_df.isna().sum().sum()}")
print(f"Customers: {customers_df.isna().sum().sum()}")
print(f"Orders: {orders_df.isna().sum().sum()}")
print(f"Order Items: {order_items_df.isna().sum().sum()}")
print(f"Staffs: {staffs_df.isna().sum().sum()}")
print(f"Stores: {stores_df.isna().sum().sum()}")
print(f"Stocks: {stocks_df.isna().sum().sum()}")

print("\n Data Types Check:")
print("Customers:", customers_df.dtypes.tolist())
print("Orders:", orders_df.dtypes.tolist())
print("Order Items:", order_items_df.dtypes.tolist())



<a id='sql'></a>
## Load to SQL Server

### 1 - Connect to SQL Server

In [None]:
from sqlalchemy import create_engine
from urllib.parse import quote_plus

SERVER = r"GEMY"
DATABASE = "RetailDB"
ODBC_DRIVER = "ODBC Driver 17 for SQL Server"

conn_str = (
    f"DRIVER={{{ODBC_DRIVER}}};"
    f"SERVER={SERVER};"
    f"DATABASE={DATABASE};"
    "Trusted_Connection=yes;"
)

engine = create_engine(
    "mssql+pyodbc:///?odbc_connect=%s" % quote_plus(conn_str),
    fast_executemany=True
)


### 2 - Load Tables to SQL Server

In [None]:
with engine.begin() as conn:
    brands_df.to_sql('Brands', conn, if_exists='append', index=False)

In [None]:
with engine.begin() as conn:
    categories_df.to_sql('Categories', conn, if_exists='append', index=False)

In [None]:
with engine.begin() as conn:
    products_df.to_sql('Products', conn, if_exists='append', index=False)

In [None]:
with engine.begin() as conn:
    stores_df.to_sql('Stores', conn, if_exists='append', index=False)

In [None]:
# Create a copy of the DataFrame to avoid modifying the original one
staffs_df = staffs_df.copy()

# -------------------------------
# 1. Fix store_id issues
# Keep only rows where the store_id exists in the Stores table
# If a store_id is not found in stores_df, that staff row will be removed
staffs_df = staffs_df[staffs_df['store_id'].isin(stores_df['store_id'])]

# -------------------------------
# 2. Fix manager_id issues
# If manager_id is NaN or does not match any staff_id, set it to None
# SQL Server will interpret None as NULL
staffs_df['manager_id'] = staffs_df['manager_id'].where(
    staffs_df['manager_id'].isin(staffs_df['staff_id']), None
)

# -------------------------------
# 3. Convert columns to integer
# We can safely convert store_id and staff_id to int because there are no NaN values
staffs_df['store_id'] = staffs_df['store_id'].astype(int)
staffs_df['staff_id'] = staffs_df['staff_id'].astype(int)

# -------------------------------
# 4. Upload the cleaned DataFrame to SQL Server
with engine.begin() as conn:
    staffs_df.to_sql('Staffs', conn, if_exists='append', index=False)


In [None]:
customers_df = customers_df.drop(columns=['full_name'])

In [None]:
with engine.begin() as conn:
    customers_df.to_sql('Customers', conn, if_exists='append', index=False)

In [None]:
with engine.begin() as conn:
    orders_df.to_sql('Orders', conn, if_exists='append', index=False)

In [None]:
order_items_df = order_items_df.drop(columns=['total_price'])


In [None]:
orders_in_db = pd.read_sql("SELECT order_id FROM Orders", engine)

In [None]:
# Keep only rows where order_id exists in the Orders table
# This avoids foreign key constraint errors in SQL Server
valid_order_items = order_items_df[order_items_df['order_id'].isin(orders_in_db['order_id'])]

# -------------------------------
# Print how many rows are valid and how many will be rejected
print(f"Number of valid rows to insert: {len(valid_order_items)}")  # Number of valid rows
print(f"Number of rows that will be rejected: {len(order_items_df) - len(valid_order_items)}")  # Number of rejected rows



In [None]:
with engine.begin() as conn:
    valid_order_items.to_sql('OrderItems', conn, if_exists='append', index=False)

In [None]:
with engine.begin() as conn:
    stocks_df.to_sql('Stocks', conn, if_exists='append', index=False)

<a id='summary'></a>
### Write a Summary About All the Cleaning Steps

In [None]:
print("""
DATA CLEANING SUMMARY

1. LOADED DATA:
   - Loaded 9 CSV files successfully
   - Checked shapes and data types and missing,duplicates values

2. STANDARDIZED COLUMN NAMES:
   - Converted all column names to lowercase
   - Replaced spaces with underscores
      
3. Cleaned DataFrames:

   - Brand DataFrame
   - Categories DataFrame


4. DataFrames needing cleaning:

    1- Products DataFrame
    - DUPLICATES: remove

    2- Customers DataFrame
    - MISSING VALUES: 'phone' have NaNs
    - FORMAT ISSUES: Phone numbers may have multiple values, spaces, special characters
    - MISSING CALCULATED COLUMN: Need 'full_name'

    3- Orders DataFrame
    - MISSING VALUES: 'shipped_date' have NaNs
    - WRONG DATA TYPES: order_date ,required_date ,shipped_date ,stored as text instead of datetime

    4- Order Items DataFrame
    - DUPLICATES: remove
    - WRONG RANGE OF VALUES: Quantities may be negative
    In the transformation:
        - MISSING CALCULATED COLUMN: Need 'total_price'

    5- Staffs DataFrame
    - MISSING VALUES: 'phone','email','last_name ,'store_id','manager_id' have NaNs
    - drop 'store_id','manager_id' (Reason: these columns contain too many missing values and are not useful for analysis)
    - FORMAT ISSUES: Phone numbers may have multiple values, spaces, special characters
 
    6- Stores DataFrame
    - MISSING VALUES: 'zip_code','email' have NaNs
    - WRONG DATA TYPES: 'zip_code' stored as float instead of string (to allow filling with 'Unknown')
    - FORMAT ISSUES: Phone numbers may have multiple values, spaces, special characters

    7- Stocks DataFrame
    - WRONG RANGE OF VALUES: Quantities may be negative

4. DATA TRANSFORMATION:
    - Calculated total_price for each order item
    - Calculated order_total_amount for each order
    - Created full_name for customers

5. SAVED CLEANED DATA:
    - Exported all tables as CSV files
    - Loaded all tables to SQL Server
""")