# Automated Data Ingestion Pipeline
**Objective:** Automate the extraction of raw CSV data and load it into the MySQL database using Python.

This script replaces the manual "Import Wizard" process, ensuring reproducibility.

In [1]:
pip install pandas sqlalchemy mysql-connector-python notebook

^C
Note: you may need to restart the kernel to use updated packages.


Defaulting to user installation because normal site-packages is not writeable
Collecting sqlalchemy
  Downloading sqlalchemy-2.0.44-cp313-cp313-win_amd64.whl.metadata (9.8 kB)
Collecting mysql-connector-python
  Downloading mysql_connector_python-9.5.0-cp313-cp313-win_amd64.whl.metadata (7.7 kB)
Collecting notebook
  Downloading notebook-7.5.0-py3-none-any.whl.metadata (10 kB)
Collecting greenlet>=1 (from sqlalchemy)
  Downloading greenlet-3.3.0-cp313-cp313-win_amd64.whl.metadata (4.2 kB)
Collecting typing-extensions>=4.6.0 (from sqlalchemy)
  Downloading typing_extensions-4.15.0-py3-none-any.whl.metadata (3.3 kB)
Collecting jupyter-server<3,>=2.4.0 (from notebook)
  Downloading jupyter_server-2.17.0-py3-none-any.whl.metadata (8.5 kB)
Collecting jupyterlab-server<3,>=2.28.0 (from notebook)
  Downloading jupyterlab_server-2.28.0-py3-none-any.whl.metadata (5.9 kB)
Collecting jupyterlab<4.6,>=4.5.0rc0 (from notebook)
  Downloading jupyterlab-4.5.0-py3-none-any.whl.metadata (16 kB)
Collect


[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: C:\Users\karol\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.13_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [3]:
import pandas as pd
from sqlalchemy import create_engine
import os
from sqlalchemy.types import Text
from sqlalchemy import text

# 1. DATABASE CONFIGURATION
DB_USER = input("database user: ")
DB_PASSWORD = input("database password: ")
DB_PORT = input("datbase port: ")
DB_HOST = 'localhost'
DB_NAME = 'toy_store_ecommerce2'

# Create SQLAlchemy Engine
connection_string = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}"
server_engine = create_engine(connection_string)

print("Database engine created successfully.")

Database engine created successfully.


In [4]:
# 2. DATABASE CREATION 

print("Starting Setup Process...\n")

# Step A: Connect to MySQL Server 
try:
    print(f"ðŸ”Œ Connecting to MySQL server at {DB_HOST}...")
    with server_engine.connect() as conn:
        conn.execute(text(f"CREATE DATABASE IF NOT EXISTS {DB_NAME}"))
        print(f"Database '{DB_NAME}' created successfully (or already exists).")
except Exception as e:
    print(f"Error creating database: {e}")
    exit() # Stop script if we can't create DB

# Step B: Connect to the specific Database 
db_conn_str = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(db_conn_str)

Starting Setup Process...

ðŸ”Œ Connecting to MySQL server at localhost...
Database 'toy_store_ecommerce2' created successfully (or already exists).


In [5]:
# 2. FILE MAPPING (CSV -> SQL Table)

# Dictionary mapping raw CSV filenames to desired SQL table names
files_to_load = {
    'orders.csv': 'raw_orders',
    'order_items.csv': 'raw_order_items',
    'order_item_refunds.csv': 'raw_order_item_refunds',
    'products.csv': 'raw_products', 
    'website_pageviews.csv': 'raw_website_pageviews',       
    'website_sessions.csv': 'raw_website_sessions'
}

# Path to raw data folder
data_folder = r'C:\Users\karol\Desktop\Portfolio\toy store ecommerce'

In [6]:
# 3. ETL EXECUTION LOOP

for file_name, table_name in files_to_load.items():
    file_path = os.path.join(data_folder, file_name)
    
    try:
        print(f"Processing: {file_name} -> Table: {table_name}...")
        
        # A. EXTRACT: Read CSV into Pandas DataFrame
        df = pd.read_csv(file_path)
        df = df.astype(str)
        dtype_dict = {col: Text() for col in df.columns}
        
        # B. LOAD: Write DataFrame to SQL

        df.to_sql(name=table_name, 
                  con=engine, 
                  if_exists='replace', 
                  index=False, 
                  chunksize=1000,
                  dtype= dtype_dict)
        
        print(f"Success! Loaded {len(df)} rows into '{table_name}'.")
        
    except Exception as e:
        print(f"Error loading {file_name}: {e}")

print("\n All files processed. Data ingestion complete.")

Processing: orders.csv -> Table: raw_orders...
Success! Loaded 32313 rows into 'raw_orders'.
Processing: order_items.csv -> Table: raw_order_items...
Success! Loaded 40025 rows into 'raw_order_items'.
Processing: order_item_refunds.csv -> Table: raw_order_item_refunds...
Success! Loaded 1731 rows into 'raw_order_item_refunds'.
Processing: products.csv -> Table: raw_products...
Success! Loaded 4 rows into 'raw_products'.
Processing: website_pageviews.csv -> Table: raw_website_pageviews...
Success! Loaded 1188124 rows into 'raw_website_pageviews'.
Processing: website_sessions.csv -> Table: raw_website_sessions...
Success! Loaded 472871 rows into 'raw_website_sessions'.

 All files processed. Data ingestion complete.
