# TechStore - ETL Pipeline
## Business Intelligence Project
### Data Extraction, Transformation, and Loading

In [31]:
import pandas as pd
import numpy as np
import os
import sys
import warnings
from pathlib import Path
from datetime import datetime

warnings.filterwarnings('ignore')

In [32]:
PROJECT_ROOT = Path.cwd()
SCRIPTS_DIR = PROJECT_ROOT / 'scripts'
DATA_DIR = PROJECT_ROOT / 'Data'
EXTRACTED_DIR = DATA_DIR / 'extracted'
TRANSFORMED_DIR = DATA_DIR / 'transformed'
DATABASE_DIR = PROJECT_ROOT / 'database'

for directory in [EXTRACTED_DIR, TRANSFORMED_DIR, DATABASE_DIR]:
    directory.mkdir(parents=True, exist_ok=True)

sys.path.insert(0, str(SCRIPTS_DIR))

print("="*70)
print("TECHSTORE - ETL PIPELINE")
print("="*70)
print(f"Project Root: {PROJECT_ROOT}")
print(f"Execution: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*70)

TECHSTORE - ETL PIPELINE
Project Root: c:\Users\windows\TechStore\src
Execution: 2026-01-27 22:30:04


## 1. Data Extraction

In [33]:
print("\n" + "="*70)
print("MYSQL EXTRACTION")
print("="*70)

MYSQL_CONFIG = {
    'host': 'boughida.com',
    'database': 'techstore_erp',
    'user': 'student_user_4ing',
    'password': 'bi_guelma_2025'
}

try:
    from extract_mysql import MySQLExtractor
    
    extractor = MySQLExtractor(**MYSQL_CONFIG)
    
    if extractor.connect():
        print("MySQL connection established\n")
        extraction_summary = extractor.extract_all_tables()
        extractor.close()
        print(f"\n{len(extraction_summary)} tables extracted successfully")
    else:
        print("MySQL connection failed")
        
except Exception as e:
    print(f"Error in MySQL extraction: {e}")


MYSQL EXTRACTION
2026-01-27 22:30:05,373 - INFO - Connexion réussie à techstore_erp
MySQL connection established

2026-01-27 22:30:05,376 - INFO - 
2026-01-27 22:30:05,378 - INFO -  DÉBUT DE L'EXTRACTION COMPLÈTE

2026-01-27 22:30:05,713 - INFO - 
Informations sur table_sales:
2026-01-27 22:30:05,715 - INFO -    Nombre de lignes: 25000
2026-01-27 22:30:05,716 - INFO -    Colonnes: Trans_ID, Date, Store_ID, Product_ID, Customer_ID, Quantity, Total_Revenue
2026-01-27 22:30:05,719 - INFO -  Extraction de la table: table_sales
2026-01-27 22:30:08,307 - INFO - 25000 lignes extraites de table_sales
2026-01-27 22:30:08,311 - INFO - Fichier sauvegardé: data/extracted/sales.csv
2026-01-27 22:30:08,312 - INFO - 
------------------------------------------------------------

2026-01-27 22:30:08,643 - INFO - 
Informations sur table_products:
2026-01-27 22:30:08,645 - INFO -    Nombre de lignes: 38
2026-01-27 22:30:08,647 - INFO -    Colonnes: Product_ID, Product_Name, SubCat_ID, Unit_Price, Unit_C

In [34]:
print("\n" + "="*70)
print("WEB SCRAPING - COMPETITOR PRICES")
print("="*70)

try:
    from scrape_competitors import scrape_with_fallback
    
    df_competitor = scrape_with_fallback()
    
    if df_competitor is not None and len(df_competitor) > 0:
        print(f"\n{len(df_competitor)} competitor prices extracted")
        print(f"File: {EXTRACTED_DIR / 'competitor_prices.csv'}")
    else:
        print("No competitor data retrieved")
        
except Exception as e:
    print(f"Error in web scraping: {e}")


WEB SCRAPING - COMPETITOR PRICES
2026-01-27 22:30:12,458 - INFO - Starting web scraping process
2026-01-27 22:30:12,462 - INFO - Fetching page: https://boughida.com/competitor/
2026-01-27 22:30:13,868 - INFO - Page successfully retrieved
2026-01-27 22:30:13,880 - INFO - Found additional pages: 2
2026-01-27 22:30:13,883 - INFO - 
Page 1/3: https://boughida.com/competitor/
2026-01-27 22:30:13,885 - INFO - ------------------------------------------------------------
2026-01-27 22:30:13,888 - INFO - Fetching page: https://boughida.com/competitor/
2026-01-27 22:30:15,144 - INFO - Page successfully retrieved
2026-01-27 22:30:15,152 - INFO - Found 13 potential product containers
2026-01-27 22:30:15,158 - INFO - Extracted: Samsung S23 Ultra: 174800.0 DZD
2026-01-27 22:30:15,161 - INFO - Extracted: HP LaserJet Pro: 46000.0 DZD
2026-01-27 22:30:15,164 - INFO - Extracted: Dell 24 Monitor: 24200.0 DZD
2026-01-27 22:30:15,167 - INFO - Extracted: Canon i-Sensys: 39000.0 DZD
2026-01-27 22:30:15,169 

In [35]:
print("\n" + "="*70)
print("OCR - LEGACY INVOICES (OPTIONAL)")
print("="*70)

invoice_dir = DATA_DIR / 'legacy_invoices'
has_invoices = invoice_dir.exists() and len(list(invoice_dir.glob('*.jpg'))) > 0

if has_invoices:
    print(f"{len(list(invoice_dir.glob('*.jpg')))} invoices detected\n")
    
    try:
        from extract_legacy_invoices import InvoiceOCRProcessor
        
        processor = InvoiceOCRProcessor(str(invoice_dir))
        df_legacy = processor.process_and_save()
        
        if df_legacy is not None and len(df_legacy) > 0:
            print(f"\n{len(df_legacy)} invoices processed")
        else:
            print("No data extracted from invoices")
            
    except Exception as e:
        print(f"OCR error: {e}")
else:
    print("No invoices found (optional step)")


OCR - LEGACY INVOICES (OPTIONAL)
5 invoices detected

2026-01-27 22:30:22,483 - INFO - Tesseract found in PATH: C:\Program Files\Tesseract-OCR\tesseract.EXE
2026-01-27 22:30:22,504 - INFO - STARTING OCR INVOICE PROCESSING
2026-01-27 22:30:22,512 - INFO - Found 5 invoice images to process

2026-01-27 22:30:22,515 - INFO - 
2026-01-27 22:30:22,522 - INFO - PROCESSING [1/5]: order_001.jpg


2026-01-27 22:30:26,178 - INFO - Extracted from order_001.jpg:
2026-01-27 22:30:26,182 - INFO -   [OK] Date: 2022-09-22
2026-01-27 22:30:26,186 - INFO -   [OK] Order_Reference: ORD-5073
2026-01-27 22:30:26,190 - INFO -   [OK] Customer_ID: C1001
2026-01-27 22:30:26,193 - INFO -   [OK] Customer_Name: Sami Oukil
2026-01-27 22:30:26,233 - INFO -   [OK] Product_Name: HP Victus 15
2026-01-27 22:30:26,238 - INFO -   [OK] Quantity: 2
2026-01-27 22:30:26,240 - INFO -   [OK] Unit_Price: 125000.0
2026-01-27 22:30:26,243 - INFO -   [OK] Total_Revenue: 250000.0
2026-01-27 22:30:26,245 - INFO - 
2026-01-27 22:30:26,248 - INFO - PROCESSING [2/5]: order_002.jpg
2026-01-27 22:30:29,616 - INFO - Extracted from order_002.jpg:
2026-01-27 22:30:29,622 - INFO -   [OK] Date: 2022-02-20
2026-01-27 22:30:29,624 - INFO -   [--] Order_Reference: None
2026-01-27 22:30:29,625 - INFO -   [OK] Customer_ID: C1003
2026-01-27 22:30:29,629 - INFO -   [OK] Customer_Name: Meriem Bouzid
2026-01-27 22:30:29,630 - INFO -   [

## 2. Data Transformation

In [36]:
print("\n" + "="*70)
print("LOADING EXTRACTED DATA")
print("="*70)

dataframes = {}

csv_files = {
    'sales': 'sales.csv',
    'products': 'products.csv',
    'customers': 'customers.csv',
    'stores': 'stores.csv',
    'cities': 'cities.csv',
    'categories': 'categories.csv',
    'subcategories': 'subcategories.csv',
    'reviews': 'reviews.csv',
    'competitor_prices': 'competitor_prices.csv'
}

for name, filename in csv_files.items():
    filepath = EXTRACTED_DIR / filename
    if filepath.exists():
        dataframes[name] = pd.read_csv(filepath)
        print(f"{name:20} {len(dataframes[name]):>7,} rows")
    else:
        print(f"{name:20} File not found")

print(f"\n{len(dataframes)} files loaded")


LOADING EXTRACTED DATA
sales                 25,000 rows
products                  38 rows
customers              1,200 rows
stores                    12 rows
cities                    12 rows
categories                 5 rows
subcategories             15 rows
reviews                3,000 rows
competitor_prices         38 rows

9 files loaded


In [37]:
print("\n" + "="*70)
print("RUNNING TRANSFORMATION PIPELINE")
print("="*70)

try:
    os.chdir(SCRIPTS_DIR)
    import transform_data
    transform_data.main()
    os.chdir(PROJECT_ROOT)
    print("\nTransformation completed successfully")
    
except Exception as e:
    print(f"Transformation error: {e}")
    import traceback
    traceback.print_exc()
    os.chdir(PROJECT_ROOT)


RUNNING TRANSFORMATION PIPELINE
ETL PIPELINE - TRANSFORMATION PHASE

[1/8] Loading flat files...
✓ Flat files loaded successfully
   Marketing: (180, 4)
   Targets:   (432, 4)
   Shipping:  (13, 4)

[2/8] Cleaning dataframes...
✓ Cleaning completed

[3/8] Harmonizing currency...
✓ USD → DZD conversion done

[4/8] Analyzing sentiment...
✓ Sentiment analysis done (38 products)

[5/8] Creating dimension tables...
✓ 38 competitor prices matched
✓ dim_product created (38 products)
   Price range: 800.00 - 360000.00 DZD
   Categories: 5 unique
✓ dim_store created (12 stores)
   Regions: 4 unique
   Average monthly target: 3,132,539.62 DZD
   Average annual target: 37,590,475.47 DZD
   Initial rows: 1200
   Found 90 homonymes (same name + city) → removing
   Examples of homonymes found:
      'Amel Amrani' in city 11: customer_ids [85, 1148]
      'Amel Chaouch' in city 3: customer_ids [698, 923]
      'Amel Khelifa' in city 7: customer_ids [496, 558]
      'Amine Djedid' in city 8: customer

In [38]:
print("\n" + "="*70)
print("VERIFYING TRANSFORMED FILES")
print("="*70)

transformed_files = {
    'Dim_Customer': 'Dim_Customer.csv',
    'Dim_Product': 'Dim_Product.csv',
    'Dim_Store': 'Dim_Store.csv',
    'Dim_Date': 'Dim_Date.csv',
    'Fact_Sales': 'Fact_Sales.csv'
}

transformed_data = {}

for name, filename in transformed_files.items():
    filepath = TRANSFORMED_DIR / filename
    if filepath.exists():
        df = pd.read_csv(filepath)
        transformed_data[name] = df
        print(f"{name:20} {len(df):>7,} rows x {len(df.columns):>2} columns")
    else:
        print(f"{name:20} File not found")

print(f"\n{len(transformed_data)}/5 tables transformed")


VERIFYING TRANSFORMED FILES
Dim_Customer           1,110 rows x  5 columns
Dim_Product               38 rows x 14 columns
Dim_Store                 12 rows x  8 columns
Dim_Date                 731 rows x 10 columns
Fact_Sales            25,000 rows x 12 columns

5/5 tables transformed


In [39]:
if len(transformed_data) >= 5:
    print("\n" + "="*70)
    print("STAR SCHEMA PREVIEW")
    print("="*70)
    
    print("\nDIMENSIONS:")
    for dim in ['Dim_Customer', 'Dim_Product', 'Dim_Store', 'Dim_Date']:
        if dim in transformed_data:
            df = transformed_data[dim]
            print(f"\n{dim}:")
            print(f"  Rows: {len(df):,}")
            print(f"  Columns: {', '.join(df.columns.tolist()[:5])}...")
            print(df.head(2).to_string(index=False))
    
    print("\nFACT TABLE:")
    if 'Fact_Sales' in transformed_data:
        df = transformed_data['Fact_Sales']
        print(f"\nFact_Sales:")
        print(f"  Rows: {len(df):,}")
        print(f"  Columns: {', '.join(df.columns.tolist())}")
        print(df.head(3).to_string(index=False))
        
        print(f"\nFinancial Summary:")
        print(f"  Total Revenue: {df['total_revenue'].sum():,.2f} DZD")
        print(f"  Net Profit: {df['net_profit'].sum():,.2f} DZD")
        print(f"  Profit Margin: {(df['net_profit'].sum() / df['total_revenue'].sum() * 100):.2f}%")


STAR SCHEMA PREVIEW

DIMENSIONS:

Dim_Customer:
  Rows: 1,110
  Columns: customer_id, full_name, city_id, city_name, region...
 customer_id   full_name  city_id city_name region
           1 Fares Mekki        9    Biskra  South
           2 Zineb Oukil       10  Ghardaia  South

Dim_Product:
  Rows: 38
  Columns: product_id, product_name, subcategory_id, subcategory_name, category_id...
 product_id product_name  subcategory_id subcategory_name  category_id category_name  unit_price  unit_cost  competitor_price  price_difference  price_difference_pct  avg_sentiment  avg_rating  review_count
        100 HP Victus 15               1          Laptops            1     Computers    125000.0    87901.0          119700.0            5300.0                  4.43           0.22        3.54            97
        101  Dell XPS 13               1          Laptops            1     Computers    260000.0   167880.0          242700.0           17300.0                  7.13           0.24        3.55  

## 3. Database Loading

In [40]:
print("\n" + "="*70)
print("CREATING DATA WAREHOUSE")
print("="*70)

try:
    os.chdir(SCRIPTS_DIR)
    exec(open('create_database.py').read())
    os.chdir(PROJECT_ROOT)
    print("\nData warehouse created successfully")
    
except Exception as e:
    print(f"Database creation error: {e}")
    import traceback
    traceback.print_exc()
    os.chdir(PROJECT_ROOT)


CREATING DATA WAREHOUSE
TECHSTORE DATA WAREHOUSE - DATABASE LOADING
Star Schema: 1 Fact Table + 4 Dimension Tables

[1/4] Loading transformed Star Schema files...
  Dim_Customer: 1,110 rows loaded
  Dim_Date: 731 rows loaded
  Dim_Product: 38 rows loaded
  Dim_Store: 12 rows loaded
  Fact_Sales: 25,000 rows loaded

[2/4] Standardizing column names for database...

  [DEBUG] Available columns in each dataframe:
    Dim_Customer: ['customer_id', 'full_name', 'city_id', 'city_name', 'region']
    Dim_Product: ['product_id', 'product_name', 'subcategory_id', 'subcategory_name', 'category_id', 'category_name', 'unit_price', 'unit_cost', 'competitor_price', 'price_difference', 'price_difference_pct', 'avg_sentiment', 'avg_rating', 'review_count']
    Dim_Store: ['store_id', 'store_name', 'city_id', 'city_name', 'region', 'monthly_target', 'annual_target', 'manager_name']
  Column mapping completed

  Final column counts:
    Dim_Customer: 4 columns
    Dim_Product: 7 columns
    Dim_Store: 

In [41]:
print("\n" + "="*70)
print("DATABASE CONNECTION TEST")
print("="*70)

try:
    import sqlite3
    
    db_path = DATABASE_DIR / 'techstore_dw.db'
    
    if db_path.exists():
        conn = sqlite3.connect(str(db_path))
        cursor = conn.cursor()
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
        tables = cursor.fetchall()
        
        print(f"\nConnected: {db_path}")
        print(f"\nTables ({len(tables)}):")
        
        for table in tables:
            table_name = table[0]
            cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
            count = cursor.fetchone()[0]
            print(f"  {table_name:20} {count:>8,} rows")
        
        print("\nTesting Star Schema joins:")
        test_query = """
        SELECT 
            fs.Sale_ID,
            dd.Full_Date,
            dp.Product_Name,
            ds.Store_Name,
            dc.Customer_Name,
            fs.Total_Revenue,
            fs.Net_Profit
        FROM Fact_Sales fs
        JOIN Dim_Date dd ON fs.Date_ID = dd.Date_ID
        JOIN Dim_Product dp ON fs.Product_ID = dp.Product_ID
        JOIN Dim_Store ds ON fs.Store_ID = ds.Store_ID
        JOIN Dim_Customer dc ON fs.Customer_ID = dc.Customer_ID
        LIMIT 5
        """
        
        test_df = pd.read_sql(test_query, conn)
        print(test_df.to_string(index=False))
        print("\nJoins working correctly")
        
        conn.close()
    else:
        print(f"Database not found: {db_path}")
        
except Exception as e:
    print(f"Database test error: {e}")


DATABASE CONNECTION TEST

Connected: c:\Users\windows\TechStore\src\database\techstore_dw.db

Tables (5):
  Dim_Customer            1,110 rows
  Dim_Date                  731 rows
  Dim_Product                38 rows
  Dim_Store                  12 rows
  Fact_Sales             25,000 rows

Testing Star Schema joins:
 Sale_ID  Full_Date Product_Name                  Store_Name  Customer_Name  Total_Revenue  Net_Profit
       1 2024-07-02 Redmi Buds 4 TechStore Constantine Cirta    Ryad Benali         9000.0     1956.37
       2 2023-07-01 HP 650 Black       TechStore Ouargla Sud Mohamed Benali         2500.0      417.16
       4 2024-08-01 Redmi Buds 4        TechStore Oran Bahia Sarah Zerrouki         4500.0      364.51
       5 2023-04-30  Oppo Reno 8      TechStore Biskra Ziban   Amel Brahami        58000.0    19474.00
       6 2024-06-25 HP 650 Black       TechStore Ouargla Sud  Fares Khelifa         2500.0      437.77

Joins working correctly


## 4. Validation & Testing

In [42]:
print("\n" + "="*70)
print("DATA QUALITY VALIDATION")
print("="*70)

validation_results = []

for name, df in transformed_data.items():
    missing = df.isnull().sum().sum()
    missing_pct = (missing / (df.shape[0] * df.shape[1])) * 100
    
    if 'Customer_ID' in df.columns:
        duplicates = df['Customer_ID'].duplicated().sum()
    elif 'Product_ID' in df.columns:
        duplicates = df['Product_ID'].duplicated().sum()
    elif 'Store_ID' in df.columns:
        duplicates = df['Store_ID'].duplicated().sum()
    elif 'Date_ID' in df.columns:
        duplicates = df['Date_ID'].duplicated().sum()
    elif 'Sale_ID' in df.columns:
        duplicates = df['Sale_ID'].duplicated().sum()
    else:
        duplicates = 0
    
    validation_results.append({
        'Table': name,
        'Rows': len(df),
        'Columns': len(df.columns),
        'Missing_Values': missing,
        'Missing_Pct': f"{missing_pct:.2f}%",
        'Duplicate_Keys': duplicates,
        'Status': 'OK' if missing_pct < 5 and duplicates == 0 else 'WARNING'
    })

df_validation = pd.DataFrame(validation_results)
print("\n" + df_validation.to_string(index=False))


DATA QUALITY VALIDATION

       Table  Rows  Columns  Missing_Values Missing_Pct  Duplicate_Keys Status
Dim_Customer  1110        5               0       0.00%               0     OK
 Dim_Product    38       14               0       0.00%               0     OK
   Dim_Store    12        8               0       0.00%               0     OK
    Dim_Date   731       10               0       0.00%               0     OK
  Fact_Sales 25000       12               0       0.00%               0     OK


In [43]:
print("\n" + "="*70)
print("SQL QUERIES TEST")
print("="*70)

try:
    os.chdir(SCRIPTS_DIR)
    exec(open('test_queries.py').read())
    os.chdir(PROJECT_ROOT)
    
except Exception as e:
    print(f"SQL tests error: {e}")
    os.chdir(PROJECT_ROOT)


SQL QUERIES TEST
COMPREHENSIVE SQL QUERIES TEST
Connected to: ../database/techstore_dw.db

[1/13] Testing: Total Revenue
  Success: 1 rows, 1 columns
  Result:
 Total_Revenue
   859661700.0

[2/13] Testing: Net Profit
  Success: 1 rows, 1 columns
  Result:
  Net_Profit
259493425.49

[3/13] Testing: Total Transactions
  Success: 1 rows, 1 columns
  Result:
 Total_Transactions
              25000

[4/13] Testing: Avg Transaction Value
  Success: 1 rows, 1 columns
  Result:
 Avg_Transaction_Value
              34386.47

[5/13] Testing: Monthly Trends
  Success: 24 rows, 7 columns

[6/13] Testing: Top Selling Products
  Success: 15 rows, 6 columns

[7/13] Testing: Category Performance
  Success: 5 rows, 6 columns

[8/13] Testing: Store Ranking
  Success: 12 rows, 8 columns

[9/13] Testing: Regional Performance
  Success: 4 rows, 6 columns

[10/13] Testing: Top Customers
  Success: 20 rows, 7 columns

[11/13] Testing: Profit Margin by Category
  Success: 5 rows, 9 columns

[12/13] Testing:

## 5. Pipeline Summary

In [44]:
print("\n" + "="*70)
print("ETL PIPELINE COMPLETED")
print("="*70)

summary = {
    'Extraction': {
        'MySQL Tables': len([k for k in dataframes.keys() if k not in ['competitor_prices']]),
        'Competitor Prices': 'competitor_prices' in dataframes,
        'Legacy Invoices': (DATA_DIR / 'extracted' / 'legacy_sales.csv').exists()
    },
    'Transformation': {
        'Dimensions': len([k for k in transformed_data.keys() if k.startswith('Dim_')]),
        'Fact Table': 'Fact_Sales' in transformed_data
    },
    'Loading': {
        'Database': (DATABASE_DIR / 'techstore_dw.db').exists(),
        'Tables': len(tables) if 'tables' in locals() else 0
    }
}

print("\nEXTRACTION:")
for key, value in summary['Extraction'].items():
    status = "OK" if value else "SKIPPED"
    print(f"  {key}: {value} [{status}]")

print("\nTRANSFORMATION:")
for key, value in summary['Transformation'].items():
    status = "OK" if value else "FAILED"
    print(f"  {key}: {value} [{status}]")

print("\nLOADING:")
for key, value in summary['Loading'].items():
    status = "OK" if value else "FAILED"
    print(f"  {key}: {value} [{status}]")

print("\nNEXT STEPS:")
print("  1. Launch dashboard: streamlit run dashboard/dashboard_app.py")
print("  2. Analyze business metrics")
print("  3. Generate final report")

print("\n" + "="*70)
print("FILES GENERATED:")
print("="*70)
print(f"  Data Warehouse: {DATABASE_DIR / 'techstore_dw.db'}")
print(f"  Extracted Data: {EXTRACTED_DIR}/")
print(f"  Transformed Data: {TRANSFORMED_DIR}/")
print("\n" + "="*70)


ETL PIPELINE COMPLETED

EXTRACTION:
  MySQL Tables: 8 [OK]
  Competitor Prices: True [OK]
  Legacy Invoices: True [OK]

TRANSFORMATION:
  Dimensions: 4 [OK]
  Fact Table: True [OK]

LOADING:
  Database: True [OK]
  Tables: 5 [OK]

NEXT STEPS:
  1. Launch dashboard: streamlit run dashboard/dashboard_app.py
  2. Analyze business metrics
  3. Generate final report

FILES GENERATED:
  Data Warehouse: c:\Users\windows\TechStore\src\database\techstore_dw.db
  Extracted Data: c:\Users\windows\TechStore\src\Data\extracted/
  Transformed Data: c:\Users\windows\TechStore\src\Data\transformed/

