# Ecommerce Sales & Orders Data Pipeline System

 This project simulates a real-world data pipeline for an ecommerce platform. It uses Python and MySQL to ingest, clean, transform, and analyze data from a CSV source file. The focus is on showcasing SQL integration, data processing, and business intelligence using Python and SQL.

#    Database Schema

The system contains the following tables:

- customers
- products
- orders
- order_items
- payments
  
(SQL schema executed externally using MySQL Workbench. Schema and view definitions are available in separate .sql files.)

## Data Ingestion

In [None]:
import pandas as pd
import mysql.connector
from mysql.connector import connect


df = pd.read_csv("./data/ecom-sales-data1.csv")
df.head()

## Data Cleaning

In [None]:
df.columns = df.columns.str.strip().str.lower()

df['unit_price'] = df['unit_price'].replace(r'[\$,]', '', regex=True).astype(float)
df['total_price'] = df['unit_price'].replace(r'[\$,]','', regex= True).astype(float)

date_columns = ['order_date', 'registered_date']
for col in date_columns:
    df[col]= pd.to_datetime(df[col],format = 'mixed')

In [None]:
#drop duplicates

df.drop_duplicates(inplace= True)

#reset index

df.reset_index(drop= True, inplace= True)

# split the cleaned dataframe into tabe 
#remove duplicates in each table-specific view

#customer table
customers_df= df[['customer_id', 'customer_name', 'email', 'country', 'registered_date']].drop_duplicates() 

#prducts table
products_df= df[['product_id', 'product_name', 'category', 'unit_price', 'quantity']].drop_duplicates()

#orders table
orders_df= df[['order_id', 'customer_id', 'order_date', 'total_price', 'status']].drop_duplicates()

#order_items table
order_items_df= df[['item_id', 'order_id', 'product_id', 'quantity', 'unit_price']].drop_duplicates()

#payments table
payments_df= df[['payment_id', 'order_id', 'customer_id', 'payment_method', 'total_price']].drop_duplicates()

## SQL Database Connection & Table creation

In [None]:
#Insert data into MYSQL database

try: 
    connection = mysql.connector.connect(
        host = "localhost",
        user = "your_username",
        password = "your_password",
        database = "sales_orders_db"
    )
    cursor = connection.cursor()
    print("Connected to SQL")
except Error as e:
    print("Error:", e)

## Insert Clean data to MYSQL

In [None]:
#insert data from dataframe to sql table:

for _, row in df.iterrows():
    #insert into customers
        
    sql = """
    INSERT IGNORE INTO customers (
    customer_id, customer_name, email, registered_date, country)
    Values(%s, %s, %s, %s, %s)
    """
    data = (
        (row['customer_id']),
        (row['customer_name']),
        (row['email']),
        (row['registered_date']),
        (row['country'])
    )

    cursor.execute(sql, data)

    #insert into products
    
    sql = """ 
    INSERT IGNORE INTO products (
    product_id, product_name, category, quantity, unit_price)
    Values(%s, %s, %s, %s, %s)
    """
    data = (
        (row['product_id']),
        (row['product_name']),
        (row['category']),
        int(row['quantity']),
        float(row['unit_price'])
    )
    
    cursor.execute(sql, data)

    #insert into orders
    
    sql ="""
    INSERT IGNORE INTO orders (
    order_id, customer_id, order_date, total_price, status)
    Values(%s, %s, %s, %s, %s)
    """
    data = (
        (row['order_id']),
        (row['customer_id']),
         (row['order_date']),
        float(row['total_price']),
        (row['status'])
    )

    cursor.execute(sql, data) 

    #insert into order_item
    
    sql ="""
    INSERT IGNORE INTO order_items (
    item_id, order_id, product_id, quantity, unit_price)
    Values(%s, %s, %s, %s, %s)
    """
    data = (
        (row['item_id']),
        (row['order_id']),
        (row['product_id']),
        int(row['quantity']),
        float(row['unit_price'])
    )
   
    cursor.execute(sql, data)

    #insert into payments
    
    sql ="""
     INSERT IGNORE INTO payments(
    payment_id, order_id, customer_id, payment_method, total_price)
    Values(%s, %s, %s, %s, %s)
    """
    data = (
        (row['payment_id']),
        (row['order_id']),
        (row['customer_id']),
        (row['payment_method']),
        float(row['total_price']),
        )
    
    cursor.execute(sql, data)

connection.commit()
print("All data inserted!")

## SQL Queries for Analysis

In [None]:
import matplotlib
from matplotlib import pyplot as plt
import seaborn as sns
import pandas as pd
import mysql.connector
from mysql.connector import connect
import sqlalchemy
from sqlalchemy import create_engine

In [None]:
query1 = """ 
select
	o.order_id,
    c.customer_name,
    o.order_date,
    o.total_price
from orders o
join customers c on o.customer_id = c.customer_id
left join payments p on o.order_id = p.order_id
where p.payment_id is null;
"""
df_unpaid = pd.read_sql(query1, connection)
df_unpaid.head()

## Insight and Visualizations

In [None]:
query = """ 
select
	c.customer_name,
    sum(o.total_price) as total_spent
from customers c
join orders o on c.customer_id = o.customer_id
group by c.customer_name
order by total_spent desc
limit 5 ;
"""
df_top_customers = pd.read_sql(query, connection)


sns.barplot(data=df_top_customers, x= 'total_spent', y = 'customer_name')
plt.title("Top 5 Customers by Total Spend")
plt.xlabel("Total Spent")
plt.ylabel("Customer")
plt.show()

In [None]:
query = """ 
select
	p.product_name,
    p.category,
	sum(oi.quantity) as total_sold
from
	products p
join order_items oi on p.product_id = oi.product_id
group by p.product_name,p.category
order by total_sold desc
limit 5;
"""
df_best_product = pd.read_sql(query, connection)


sns.barplot(data=df_best_product, x= 'total_sold', y = 'product_name')
plt.title("Top 5 Best Selling Products")
plt.xlabel("Unit Sold")
plt.ylabel("Products")
plt.show()

In [None]:
query = """ 
select
	p.category,
    sum(oi.quantity * oi.unit_price ) as total_revenue
from products p
join order_items oi on p.product_id = oi.product_id
group by p.category
order by total_revenue desc;
"""
df_category_revenue = pd.read_sql(query, connection)


sns.barplot(data=df_category_revenue, x= 'category', y = 'total_revenue')
plt.title("Revenue by Product Category")
plt.xlabel("Categories")
plt.ylabel("Total Revenue")
plt.xticks(rotation = 30)
plt.grid(True)
plt.show()

In [None]:
query = """ 
select
	date_format(order_date, '%Y-%m') as month,
	count(order_id) as total_orders,
    sum(total_price) as revenue
from orders
group by month
order by month;	
"""
df_monthly_revenue = pd.read_sql(query, connection)


sns.barplot(data=df_monthly_revenue, x= 'month', y = 'revenue')
plt.title("Monthly Revenue")
plt.xlabel("Month")
plt.ylabel("Revenue")
plt.xticks(rotation = 75)
plt.grid(False)
plt.show()

In [None]:
query = """ 
select
    c.country,
    count(o.order_id) as total_orders,
    sum(o.total_price) as total_revenue,
    round(avg(o.total_price),2) as avd_order_value
from customers c
join orders o on c.customer_id = o.customer_id
group by c.country
order by total_revenue desc;
"""
df_order_per_country = pd.read_sql(query, connection)


sns.barplot(data=df_order_per_country, x= 'country', y = 'total_orders')
plt.title("Average Order per Country")
plt.xlabel("Countries")
plt.ylabel("Total Orders")
plt.xticks(rotation = 75)
plt.grid(False)
plt.show()

## Conclusion

This project demonstrates an end-to-end data engineering and analysis workflow using SQL and Python. It includes:
- Schema design and table creation in MySQL
- Data ingestion and cleaning from a CSV source
- Insertion of clean data into SQL
- Business-focused queries and visualizations

This project can serve as a foundation for building advanced reporting tools, dashboards, or real-time data pipelines.