In [None]:
import pandas as pd
import phonenumbers
import mysql.connector
from phonenumbers import NumberParseException
from datetime import datetime
from mysql.connector import Error

In [None]:
data_quality = {
    "customers": {},
    "products": {},
    "sales": {}
}

### extract data

In [None]:
def read_data(filename):
    df = pd.read_csv(filename)
    return df

In [None]:
customers_df = read_data('customers_raw.csv')
data_quality["customers"]["records_read"] = len(customers_df)
print(customers_df.shape)
customers_df.head()

drop duplicates

In [None]:
before = len(customers_df)
customers_df = customers_df.drop_duplicates()
data_quality["customers"]["duplicates_removed"] = before - len(customers_df)
customers_df.shape

drop missing email rows

In [None]:
before = len(customers_df)
customers_df = customers_df.dropna(subset=['email'])
data_quality["customers"]["missing_values_handled"] = before - len(customers_df)
customers_df.shape

standardize phone numbers

In [None]:
def standardize_phone(phone):
    if pd.isna(phone):
        return None
    try:
        parsed = phonenumbers.parse(str(phone), "IN")
        if not phonenumbers.is_valid_number(parsed):
            return None
        return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164)
    
    except NumberParseException:
        return None

customers_df['phone'] = customers_df['phone'].apply(standardize_phone)
customers_df

date formatting

In [None]:
customers_df['registration_date'] = pd.to_datetime(customers_df['registration_date'], format='mixed', errors='coerce').dt.strftime("%Y-%m-%d")
customers_df['registration_date']

standardizing city names

In [None]:
customers_df['city'] = customers_df['city'].str.strip().str.title()
customers_df['city']

In [None]:
# dropping customer id - it will be auto generated by sql
customers_df_db = customers_df.drop('customer_id', axis=1)
customers_df_db

## Products Data

In [None]:
products_df = read_data('products_raw.csv')
data_quality["products"]["records_read"] = len(customers_df)
products_df.shape

In [None]:
# dropping duplicates
before = len(products_df)
print(before)
products_df = products_df.drop_duplicates()
data_quality["products"]["duplicates_removed"] = before - len(products_df)
print(before - len(products_df))

In [None]:
# drop products with missing price
before = len(products_df)
products_df = products_df.dropna(subset='price')
missing_price_count = before - len(products_df)
products_df.shape

In [None]:
# standardize category names
products_df['category'] = products_df['category'].str.title()
products_df['category']

In [None]:
# fill missing stock value with 0
missing_stock_count = products_df["stock_quantity"].isna().sum()
products_df['stock_quantity'] = products_df['stock_quantity'].fillna(0).astype(int)
products_df['stock_quantity']

In [None]:
data_quality["products"]["missing_values_handled"] = (
    missing_price_count + missing_stock_count
)

In [None]:
# removing extra spaces in product names
products_df['product_name'] = products_df['product_name'].str.strip()

In [None]:
# removing duplicate product names
products_df.drop_duplicates(subset='product_name')

In [None]:
# dropping product id - it will be auto generated by sql
products_df_db = products_df.drop('product_id', axis=1)
products_df_db

In [None]:
products_df_db.dtypes

## Loading data to SQL database

In [None]:
# create a mysql connection
def get_mysql_connection():
    try:
        connection = mysql.connector.connect(
            host="localhost",
            user="root",
            password="vgsMySQL108#",
            database="fleximart"
        )
        return connection
    except Error as e:
        print("Error while connecting to MySQL:", e)
        raise


In [None]:
# load data in database
def load_data_db(df: pd.DataFrame, table_name):
    connection = get_mysql_connection()
    cursor = connection.cursor()

    cols_names = ",".join(df.columns)
    placeholder = ",".join(["%s"] * len(df.columns))
    insert_query = f"insert into fleximart.{table_name} ({cols_names}) values ({placeholder})"
    print(insert_query)

    inserted_rows = 0

    for _, row in df.iterrows():
        try:
            values = tuple(row[col] for col in df.columns)
            cursor.execute(insert_query, values)
            inserted_rows += 1
        except Error as e:
            print(f"Failed to insert into {table_name}: {e}")

    # data_quality[table_name]["records_loaded"] = inserted_rows
    connection.commit()
    cursor.close()
    connection.close()

    print(f"{inserted_rows} rows inserted into {table_name}.")
    return inserted_rows

In [None]:
# load products data into database
inserted_products = load_data_db(df=products_df_db,table_name='products')
data_quality["products"]["records_loaded"] = inserted_products

In [None]:
# fetch auto-generated product IDs from sql database
def fetch_product_id_map():
    connection = get_mysql_connection()
    cursor = connection.cursor()

    cursor.execute("SELECT product_id, product_name FROM products")
    rows = cursor.fetchall()

    cursor.close()
    connection.close()

    # Create mapping
    return {name: pid for pid, name in rows}

fetch_product_id_map()

In [None]:
customers_df_db

In [None]:
# load customers data into database
inserted_customers = load_data_db(df=customers_df_db,table_name='customers')
data_quality["customers"]["records_loaded"] = inserted_customers

## Sales Data

In [None]:
sales_df = read_data('sales_raw.csv')
data_quality["sales"]["records_read"] = len(sales_df)
sales_df.shape

In [None]:
# drop duplicate records
before = len(sales_df)
sales_df = sales_df.drop_duplicates(subset=['customer_id','product_id','quantity',
 'unit_price',
 'transaction_date'])
data_quality["sales"]["duplicates_removed"] = before - len(sales_df)
sales_df.shape

In [None]:
# drop rows with missing customer id and product id
before = len(sales_df)
sales_df = sales_df.dropna(subset=['customer_id','product_id'])
data_quality["sales"]["missing_values_handled"] = before - len(sales_df)
sales_df.shape

In [None]:
# date formatting
sales_df['transaction_date'] = pd.to_datetime(sales_df['transaction_date'], format='mixed', errors='coerce').dt.strftime("%Y-%m-%d")
sales_df['transaction_date']

In [None]:
sales_df.columns.to_list()

In [None]:
sales_df

In [None]:
# creating total amount column
sales_df['total_amount'] = sales_df['quantity'] * sales_df['unit_price']
sales_df

In [None]:
# creating orders dataframe
orders_df = sales_df[['transaction_id','customer_id','transaction_date','total_amount','status']].copy()

In [None]:
# renaming columns as per db schema
# orders_df = orders_df.rename(columns={'transaction_id': 'order_id','transaction_date': 'order_date'})
orders_df = orders_df.rename(columns={'transaction_date': 'order_date'})

In [None]:
orders_df.head()

In [None]:
# fetch auto-generated customer IDs from sql database
def fetch_customer_id_map():
    connection = get_mysql_connection()
    cursor = connection.cursor()

    cursor.execute("select customer_id, email, phone from customers")
    rows = cursor.fetchall()

    cursor.close()
    connection.close()

    # Create mapping => composite key: (email, phone) -> customer_id
    return {
        (email, phone): customer_id 
        for customer_id, email, phone in rows
    }

# fetch_customer_id_map()

### merging email and phone from customers_df to orders_df

In [None]:
customers_lookup_df = customers_df[['customer_id','email','phone']].copy()
customers_lookup_df = customers_lookup_df.rename(columns={'customer_id': 'customer_id_map'})
customers_lookup_df

In [None]:
orders_df = orders_df.merge(
    customers_lookup_df,
    how='left',
    left_on='customer_id', # from sales/orders
    right_on='customer_id_map' # from customers
)
orders_df = orders_df.copy()

In [None]:
# drop missing values
orders_df = orders_df.dropna(subset=['email','phone'])
orders_df

In [None]:
# create composite key
orders_df['customer_key'] = list(
    zip(orders_df['email'], orders_df['phone'])
)

# fetch mapping once
customer_id_dict = fetch_customer_id_map()

# apply mapping
orders_df['customer_id'] = orders_df['customer_key'].map(customer_id_dict)

# drop unmapped rows
orders_df = orders_df.dropna(subset=['customer_id'])
orders_df['customer_id'] = orders_df['customer_id'].astype(int)

In [None]:
orders_df

In [None]:
orders_df = orders_df.drop(columns=['customer_id_map','email','phone','customer_key'])

In [None]:
orders_df

In [None]:
orders_df_db = orders_df[['customer_id','order_date','total_amount','status']].copy()

In [None]:
orders_df_db

In [None]:
# load orders data into database
load_data_db(df=orders_df_db,table_name='orders')

### Order items table

In [None]:
sales_df.head()

In [None]:
orders_df

In [None]:
# fetch auto-generated order IDs from sql database
def fetch_orders_from_db():
    connection = get_mysql_connection()
    cursor = connection.cursor()

    cursor.execute("SELECT order_id, customer_id, order_date, total_amount FROM orders")
    rows = cursor.fetchall()

    cursor.close()
    connection.close()

    # composite key: (customer_id, order_date, total_amount) -> order_id
    # return {
    #     (customer_id, order_date, total_amount): order_id 
    #     for order_id, customer_id, order_date, total_amount in rows
    # }
    return rows

In [None]:
# Reattach order_id to your in-memory orders_df
orders_db_rows = fetch_orders_from_db()

orders_df_db = pd.DataFrame(
    orders_db_rows,
    columns=['order_id', 'customer_id', 'order_date', 'total_amount']
)


In [None]:
orders_df_db

In [None]:
# normalize for safe matching
orders_df_db['order_date'] = pd.to_datetime(orders_df_db['order_date']).dt.date
orders_df_db['total_amount'] = orders_df_db['total_amount'].astype(float).round(2)

orders_df['order_date'] = pd.to_datetime(orders_df['order_date']).dt.date
orders_df['total_amount'] = orders_df['total_amount'].astype(float).round(2)

In [None]:
test_merge = orders_df.merge(
    orders_df_db,
    on=['customer_id', 'order_date', 'total_amount'],
    how='left',
    indicator=True
)

print(test_merge['_merge'].value_counts())


In [None]:
# Merge order_id back using business attributes
orders_df = orders_df.merge(
    orders_df_db,
    on=['customer_id', 'order_date', 'total_amount'],
    how='inner'
)

In [None]:
# Bring order_id into sales_df
orders_lookup_df = orders_df[['transaction_id','order_id']].copy()

sales_df = sales_df.merge(
    orders_lookup_df,
    on='transaction_id',
    how='inner'
)

sales_df = sales_df.copy()
# now sales_df has order_id

### Product mapping
Bring product_name into sales_df

In [None]:
# Map product IDs (CSV â†’ MySQL)
product_id_map = fetch_product_id_map()

In [None]:
sales_df

In [None]:
products_lookup_df = products_df[['product_id', 'product_name']].copy()

In [None]:
sales_df = sales_df.merge(
    products_lookup_df,
    on='product_id',   # CSV product_id (P001)
    how='left'
)

In [None]:
sales_df

In [None]:
print(sales_df['product_name'].isna().sum())

In [None]:
sales_df['product_id'] = sales_df['product_name'].map(product_id_map)

In [None]:
sales_df

In [None]:
sales_df = sales_df.dropna(subset=['product_id']).copy()
sales_df['product_id'] = sales_df['product_id'].astype(int)

In [None]:
sales_df

In [None]:
# drop helper column - product_name
sales_df = sales_df.drop(columns=['product_name'])

In [None]:
# compute subtotal
sales_df['subtotal'] = sales_df['quantity'] * sales_df['unit_price']

In [None]:
sales_df

In [None]:
# creating order_items dataframe
order_items_df = sales_df[['order_id','product_id','quantity','unit_price','subtotal']].copy()
order_items_df

In [None]:
# load order_items data into database
inserted_sales = load_data_db(df=order_items_df,table_name='order_items')
data_quality["sales"]["records_loaded"] = inserted_sales

In [None]:
data_quality