In [1]:
pip install mysql-connector-python


Collecting mysql-connector-python
  Downloading mysql_connector_python-9.1.0-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (6.0 kB)
Downloading mysql_connector_python-9.1.0-cp310-cp310-manylinux_2_28_x86_64.whl (34.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m34.4/34.4 MB[0m [31m37.4 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hInstalling collected packages: mysql-connector-python
Successfully installed mysql-connector-python-9.1.0
Note: you may need to restart the kernel to use updated packages.


**1.Establish Connection with MySQL**

In [42]:
import pandas as pd
import mysql.connector
from datetime import datetime

# 1. Create database connection
def create_db_connection():
    try:
        db = mysql.connector.connect(
            host="mysql-204e2163-bagdesameer92-369f.d.aivencloud.com",
            user="avnadmin",
            password="AVNS__zNcky48VFw4ThNwd6c",
            database="defaultdb",
            port=21095,
            ssl_ca='/kaggle/input/interview/ca.pem'  # CA cert path for SSL connection
        )
        return db
    except mysql.connector.Error as err:
        print(f"Error connecting to database: {err}")
        return None

**2. Extract: Data Collection**

extract_data(db):

This part of the ETL process is responsible for extracting the raw data from a source (in this case, MySQL database tables).

The extract_data function connects to the database and retrieves data from five tables: ProductInformation, InventoryDetails, Pricing, SalesMetrics, and ReplenishmentInformation.
Each table is queried using SQL, and the results are stored in pandas DataFrames for further processing.
The function uses the pd.read_sql method to directly load the data from the SQL queries into pandas DataFrames.
If there's an error in extraction, the function returns None.

In [43]:
# 2. Extract: Data Collection
def extract_data(db):
    try:
        queries = {
            'Products': "SELECT * FROM Products",
            'Inventory': "SELECT * FROM Inventory",
            'Sales': "SELECT * FROM Sales"
        }

        dataframes = {}
        for name, query in queries.items():
            dataframes[name] = pd.read_sql(query, db)
            print(f"Columns in {name} DataFrame: {dataframes[name].columns.tolist()}")  # Debugging line
        
        return dataframes
    except Exception as e:
        print(f"Error during data extraction: {e}")
        return None

**3. Transform: Data Processing and Transformation**
transform_data(dataframes):

In this stage, the extracted data is processed and transformed into a format that is suitable for loading into the target database. The transformation involves data cleaning, merging, and calculating derived fields.

Steps involved:

Merging: Data from multiple sources (product_info, inventory, pricing, sales, replenishment) is merged into a single DataFrame using pandas merge() function. The key for merging is the product_id.
Calculating Derived Fields:
total_cost is calculated as inventory_qty * holding_cost_per_unit.
total_profit is calculated as total_sales_value - total_cost.
avg_monthly_sales is calculated as total_sales_value / 12 (assuming monthly breakdown over 12 months).
Handling Missing Data: The fillna(0) method is used to replace NaN values with zeros.
Date Formatting: The inventory_date and replenishment_date columns are converted to the format YYYY-MM-DD.

In [47]:




def transform_data(dataframes):
    try:
        # Ensure 'product_id' exists in all dataframes
        for df_name, df in dataframes.items():
            if 'product_id' not in df.columns:
                print(f"Warning: 'product_id' missing in {df_name} DataFrame")
        
        # Merge all dataframes on 'product_id'
        merged_df = dataframes['Products']
        
        # Merge Inventory Data
        merged_df = merged_df.merge(dataframes['Inventory'], on='product_id', how='left')
        
        # Merge Sales Data
        merged_df = merged_df.merge(dataframes['Sales'], on='product_id', how='left')
        
        # If 'season' and 'profit_margin_pct' columns are missing, add them with default values
        if 'season' not in merged_df.columns:
            merged_df['season'] = 'Unknown'  # Default value
        if 'profit_margin_pct' not in merged_df.columns:
            merged_df['profit_margin_pct'] = 0.0  # Default value
        
        # Calculate derived fields
        merged_df['inventory_value'] = merged_df['inventory_qty'] * merged_df['original_price_per_unit']
        merged_df['total_sales_value'] = merged_df['items_sold'] * merged_df['sale_price_per_unit']
        merged_df['total_profit'] = merged_df['total_sales_value'] - merged_df['inventory_value']
        
        # Calculate averages
        merged_df['avg_items_sold_per_month'] = merged_df['items_sold'] / 12
        merged_df['avg_sales_revenue_per_month'] = merged_df['total_sales_value'] / 12
        merged_df['avg_profit_per_month'] = merged_df['total_profit'] / 12
        
        # Clean missing data by filling NaN values with zero
        merged_df.fillna(0, inplace=True)
        
        # Convert 'inventory_date' to proper date format
        merged_df['inventory_date'] = pd.to_datetime(merged_df['inventory_date']).dt.strftime('%Y-%m-%d')
        
        # Ensure 'lead_time_days' is an integer and replace NaN with 0
        merged_df['lead_time_days'] = merged_df['lead_time_days'].fillna(0).astype(int)
        
        # Filter columns to match the target structure (only the needed columns)
        merged_df = merged_df[['product_id', 'product_name', 'inventory_date', 'inventory_qty', 
                               'original_price_per_unit', 'inventory_value', 'sale_price_per_unit', 
                               'items_sold', 'total_sales_value', 'total_profit', 
                               'avg_items_sold_per_month', 'avg_sales_revenue_per_month', 
                               'avg_profit_per_month', 'warehouse_location', 'lead_time_days', 'season']]
        
        return merged_df
    except Exception as e:
        print(f"Error during data transformation: {e}")
        return None

**3. Load: Loading the Transformed Data into Target Database**
load_data(db, cursor, merged_df):

In this step, the transformed data is loaded into a target database (in this case, the same MySQL database but into a new table called TransformedProductData).

Steps involved:

Truncate Existing Data: The table TransformedProductData is cleared using TRUNCATE TABLE to ensure there are no old records.
Prepare Insert Query: The SQL insert query is prepared with placeholders (%s) for each value to be inserted into the table.
Inserting Data: Data from the transformed DataFrame (merged_df) is inserted row by row into the TransformedProductData table. Each row is converted into a tuple of values that corresponds to the table columns.
Commit the Transaction: The commit() method ensures that the changes are saved to the database.
Error Handling: In case of an error, a rollback() is triggered to undo any changes made during the process.

In [48]:

# 4. Load: Loading the Transformed Data into Target Database
def load_data(db, cursor, merged_df):
    try:
        # Create table if it doesn't exist (only relevant columns)
        create_table_query = """
        CREATE TABLE IF NOT EXISTS TransformedProductData (
            product_id INT PRIMARY KEY,
            product_name VARCHAR(255),
            inventory_date DATE,
            inventory_qty INT,
            original_price_per_unit DECIMAL(10,2),
            inventory_value DECIMAL(10,2),
            sale_price_per_unit DECIMAL(10,2),
            total_items_sold INT,
            total_sales_value DECIMAL(10,2),
            total_profit DECIMAL(10,2),
            avg_items_sold_per_month DECIMAL(10,2),
            avg_sales_revenue_per_month DECIMAL(10,2),
            avg_profit_per_month DECIMAL(10,2),
            warehouse_location VARCHAR(255),
            lead_time_days INT,
            season VARCHAR(50)
        );
        """
        cursor.execute(create_table_query)
        
        # Optionally truncate the table before loading new data
        cursor.execute("TRUNCATE TABLE TransformedProductData;")
        
        # Make sure the number of columns and placeholders match
        insert_query = """
        INSERT INTO TransformedProductData (
            product_id, product_name, inventory_date, inventory_qty, original_price_per_unit, 
            inventory_value, sale_price_per_unit, total_items_sold, total_sales_value, total_profit,
            avg_items_sold_per_month, avg_sales_revenue_per_month, avg_profit_per_month, warehouse_location, 
            lead_time_days, season
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """
        
        # Insert data into the table
        for _, row in merged_df.iterrows():
            values = (
                row['product_id'], row['product_name'], row['inventory_date'], row['inventory_qty'],
                row['original_price_per_unit'], row['inventory_value'], row['sale_price_per_unit'],
                row['items_sold'], row['total_sales_value'], row['total_profit'],
                row['avg_items_sold_per_month'], row['avg_sales_revenue_per_month'],
                row['avg_profit_per_month'], row['warehouse_location'], row['lead_time_days'], row['season']
            )
            cursor.execute(insert_query, values)
        
        db.commit()
        print(f"Successfully loaded {len(merged_df)} records into the database")
    except Exception as e:
        db.rollback()
        print(f"Error during data loading: {e}")

**Putting It All Together**

The main function orchestrates the entire ETL process. It:

Connects to the MySQL Database using create_db_connection().
Extracts Data from the source tables with extract_data().
Transforms the Data using transform_data() (merging, calculating new fields, and cleaning the data).
Creates a Target Table if it doesn't exist with create_target_table().
Loads the Transformed Data into the target table with load_data().
Handles exceptions and ensures that connections are properly closed at the end of the process.

In [4]:

# Main ETL Process
def main():
    try:
        db = create_db_connection()
        if not db:
            return
        
        cursor = db.cursor()
        
        print("Extracting data...")
        dataframes = extract_data(db)
        if not dataframes:
            return
        
        print("Transforming data...")
        merged_df = transform_data(dataframes)
        if merged_df is None:
            return
        
        print("Loading data...")
        load_data(db, cursor, merged_df)
        
        print("ETL process completed successfully!")
    
    except Exception as e:
        print(f"ETL process failed: {e}")
    finally:
        if 'cursor' in locals():
            cursor.close()
        if 'db' in locals():
            db.close()
            print("Database connection closed.")

if __name__ == "__main__":
    main()




Extracting data...


  dataframes[name] = pd.read_sql(query, db)


Columns in Products DataFrame: ['product_id', 'product_name', 'original_price_per_unit', 'sale_price_per_unit', 'lead_time_days', 'seasonal_sales']
Columns in Inventory DataFrame: ['inventory_id', 'product_id', 'inventory_date', 'inventory_qty', 'inventory_value', 'warehouse_location']
Columns in Sales DataFrame: ['sales_id', 'product_id', 'sales_date', 'items_sold', 'sales_value', 'profit']
Transforming data...
Loading data...
Successfully loaded 14 records into the database
ETL process completed successfully!
Database connection closed.


**Summary of the ETL Process**
Extract: Data is fetched from multiple MySQL tables using SQL queries.
Transform: Data is processed to create new metrics, cleaned, and merged into a single DataFrame.
Load: The transformed data is inserted into a new table in the MySQL database after truncating any existing data.
The ETL process automates the movement and transformation of data to make it useful for analysis and reporting. It ensures that the target table contains up-to-date, clean, and comprehensive data for business intelligence.