## 1. Data Pipeline Development

In [None]:

import pandas as pd   # Data analysis library
import json           # For working with JSON 
import logging        # For logging messages 

sales_data = pd.read_json('sales_data.json')  # Load JSON file into DataFrame
sales_data.head()  # Show first 5 rows


Unnamed: 0,transaction_id,customer_id,product,quantity,date,region
0,T0001,C024,"{'id': 'P03', 'name': 'Monitor', 'category': '...",4,2024-11-05 06:02:53,North
1,T0002,C001,"{'id': 'P02', 'name': 'Mouse', 'category': 'Ac...",5,2024-06-17 11:20:53,East
2,T0003,C015,"{'id': 'P02', 'name': 'Mouse', 'category': 'Ac...",9,2025-05-05 18:33:04,South
3,T0004,C024,"{'id': 'P05', 'name': 'Smartphone', 'category'...",3,2024-03-03 10:05:28,North
4,T0005,C005,"{'id': 'P02', 'name': 'Mouse', 'category': 'Ac...",7,2024-07-20 04:41:53,North


In [3]:
# Flatten 'product' column and concat with DataFrame
sales_data = pd.concat(
    [
        sales_data.drop(columns=['product']),
        pd.json_normalize(sales_data['product'])
    ],
    axis=1
)

In [4]:
# Show first 5 rows of the sales_data DataFrame

sales_data.head()

Unnamed: 0,transaction_id,customer_id,quantity,date,region,id,name,category,price
0,T0001,C024,4,2024-11-05 06:02:53,North,P03,Monitor,Electronics,299.5
1,T0002,C001,5,2024-06-17 11:20:53,East,P02,Mouse,Accessories,19.99
2,T0003,C015,9,2025-05-05 18:33:04,South,P02,Mouse,Accessories,19.99
3,T0004,C024,3,2024-03-03 10:05:28,North,P05,Smartphone,Electronics,799.99
4,T0005,C005,7,2024-07-20 04:41:53,North,P02,Mouse,Accessories,19.99


In [5]:
# Rename DataFrame columns: change 'id' to 'product_id' and 'name' to 'product_name' for clarity

sales_data = sales_data.rename(columns={
    'id': 'product_id',
    'name': 'product_name'
})

In [6]:
sales_data.head()

Unnamed: 0,transaction_id,customer_id,quantity,date,region,product_id,product_name,category,price
0,T0001,C024,4,2024-11-05 06:02:53,North,P03,Monitor,Electronics,299.5
1,T0002,C001,5,2024-06-17 11:20:53,East,P02,Mouse,Accessories,19.99
2,T0003,C015,9,2025-05-05 18:33:04,South,P02,Mouse,Accessories,19.99
3,T0004,C024,3,2024-03-03 10:05:28,North,P05,Smartphone,Electronics,799.99
4,T0005,C005,7,2024-07-20 04:41:53,North,P02,Mouse,Accessories,19.99


In [None]:
# convert to string and keep only the first 10 characters (YYYY-MM-DD format)
sales_data['date'] = sales_data['date'].astype(str).str.slice(0, 10) 


In [25]:
# Convert 'date' column to datetime and format as 'YYYY-MM-DD' string
sales_data['date'] = pd.to_datetime(sales_data['date']).dt.strftime('%Y-%m-%d')

In [26]:
sales_data.head()

Unnamed: 0,transaction_id,customer_id,quantity,date,region,product_id,product_name,category,price,negative_quantity_flag,total_value
0,T0001,C024,4,2024-11-05,North,P03,Monitor,Electronics,299.5,False,1198.0
1,T0002,C001,5,2024-06-17,East,P02,Mouse,Accessories,19.99,False,99.95
2,T0003,C015,9,2025-05-05,South,P02,Mouse,Accessories,19.99,False,179.91
3,T0004,C024,3,2024-03-03,North,P05,Smartphone,Electronics,799.99,False,2399.97
4,T0005,C005,7,2024-07-20,North,P02,Mouse,Accessories,19.99,False,139.93


## 3. Data Quality Checks

In [10]:
# Configure logging to file with timestamp, level, and message

logging.basicConfig(
    filename='data_anomalies.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)


In [11]:
# Count anomalies (duplicates, missing customer IDs, negative quantities) and log them

dup_count = sales_data.duplicated(subset=['transaction_id'], keep=False).sum()
missing_customer_count = sales_data['customer_id'].isna().sum()
negative_qty_count = (sales_data['quantity'] < 0).sum()

logging.info(f"Duplicate transaction_id count: {dup_count}")
logging.info(f"Missing customer_id count: {missing_customer_count}")
logging.info(f"Negative quantity count: {negative_qty_count}")

In [12]:
# Replace missing customer IDs with 'Unknown'

sales_data['customer_id'] = sales_data['customer_id'].fillna('Unknown')

In [13]:
# Display DataFrame summary
sales_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500 entries, 0 to 499
Data columns (total 9 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   transaction_id  500 non-null    object 
 1   customer_id     500 non-null    object 
 2   quantity        500 non-null    int64  
 3   date            500 non-null    object 
 4   region          500 non-null    object 
 5   product_id      500 non-null    object 
 6   product_name    500 non-null    object 
 7   category        500 non-null    object 
 8   price           500 non-null    float64
dtypes: float64(1), int64(1), object(7)
memory usage: 35.3+ KB


In [14]:
# Flag negative quantities and set them to 0

sales_data['negative_quantity_flag'] = sales_data['quantity'] < 0
sales_data.loc[sales_data['negative_quantity_flag'], 'quantity'] = 0

In [15]:
sales_data.head()

Unnamed: 0,transaction_id,customer_id,quantity,date,region,product_id,product_name,category,price,negative_quantity_flag
0,T0001,C024,4,2024-11-05,North,P03,Monitor,Electronics,299.5,False
1,T0002,C001,5,2024-06-17,East,P02,Mouse,Accessories,19.99,False
2,T0003,C015,9,2025-05-05,South,P02,Mouse,Accessories,19.99,False
3,T0004,C024,3,2024-03-03,North,P05,Smartphone,Electronics,799.99,False
4,T0005,C005,7,2024-07-20,North,P02,Mouse,Accessories,19.99,False


In [16]:
# Calculate total_value
sales_data['total_value'] = sales_data['quantity'] * sales_data['price']

In [17]:
sales_data.head()

Unnamed: 0,transaction_id,customer_id,quantity,date,region,product_id,product_name,category,price,negative_quantity_flag,total_value
0,T0001,C024,4,2024-11-05,North,P03,Monitor,Electronics,299.5,False,1198.0
1,T0002,C001,5,2024-06-17,East,P02,Mouse,Accessories,19.99,False,99.95
2,T0003,C015,9,2025-05-05,South,P02,Mouse,Accessories,19.99,False,179.91
3,T0004,C024,3,2024-03-03,North,P05,Smartphone,Electronics,799.99,False,2399.97
4,T0005,C005,7,2024-07-20,North,P02,Mouse,Accessories,19.99,False,139.93


In [27]:
# Count duplicates by transaction_id
sales_data.duplicated(subset=['transaction_id']).sum()

np.int64(0)

## 2. Database Design and Querying


In [24]:
# List all column names in the DataFrame

sales_data.columns

Index(['transaction_id', 'customer_id', 'quantity', 'date', 'region',
       'product_id', 'product_name', 'category', 'price',
       'negative_quantity_flag', 'total_value'],
      dtype='object')

In [30]:
# Create transactions_df with unique transaction records and reset index

transactions_df = (
    sales_data[['transaction_id', 'customer_id', 'date', 'region']]
    .drop_duplicates()
    .reset_index(drop=True)
)

In [31]:
# Create products_df with unique product records and reset index

products_df = (
    sales_data[['product_id', 'product_name', 'category', 'price']]
    .drop_duplicates()
    .reset_index(drop=True)
)

In [32]:
# Create customers_df with unique customer IDs and reset index

customers_df = (
    sales_data[['customer_id']]
    .drop_duplicates()
    .reset_index(drop=True)
)

In [33]:
# Create transaction_items_df with unique transaction-product details and reset index

transaction_items_df = (
    sales_data[['transaction_id', 'product_id', 'quantity', 'price',
                'total_value', 'negative_quantity_flag']]
    .drop_duplicates()
    .reset_index(drop=True)
)

## 4. Performance Optimization

### An index in SQL is a database feature that speeds up data retrieval by creating a quick lookup structure for specified columns.

In [34]:
# Create SQLAlchemy engine to connect to MySQL database 'sales_DB' on localhost with user root

from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:1234@localhost:3306/sales_DB", echo=False)

In [36]:
# Load normalized DataFrames into MySQL tables with batch inserts

transactions_df.to_sql(
    'transactions',
    con=engine,
    if_exists='replace',  
    index=False,
    chunksize=100,         
    method='multi'        
)

products_df.to_sql(
    'products',
    con=engine,
    if_exists='replace',
    index=False,
    chunksize=100,
    method='multi'
)

customers_df.to_sql(
    'customers',
    con=engine,
    if_exists='replace',
    index=False,
    chunksize=100,
    method='multi'
)

transaction_items_df.to_sql(
    'transaction_items',
    con=engine,
    if_exists='replace',
    index=False,
    chunksize=100,
    method='multi'
)

500