In [15]:
import pandas as pd
import pyodbc
import re
import os

os.makedirs("logs", exist_ok=True)
LOG_FILE = "logs/etl_error_log.txt"

In [16]:
def log_error(msg):
    """Append errors to log file"""
    with open(LOG_FILE, "a") as log:
        log.write(msg + "\n")

In [17]:
def clean_name(name):
    """Remove special chars, truncate long names, handle empty"""
    try:
        name = ''.join(ch for ch in str(name) if ch.isalnum() or ch.isspace())
        return name[:100] if name else "Unknown Name"
    except Exception as e:
        log_error(f"Name Cleaning Error: {e}")
        return "Unknown Name"

In [18]:
def clean_email(email):
    """Ensure valid email format"""
    try:
        return email if isinstance(email, str) and '@' in email else "invalid_email@example.com"
    except Exception as e:
        log_error(f"Email Cleaning Error: {e}")
        return "invalid_email@example.com"

In [19]:
def clean_customer_id(customer_id):
    """Remove repeated sequences in customer_id"""
    try:
        return re.sub(r"([A-Za-z0-9]+)\1", r"\1", str(customer_id))
    except Exception as e:
        log_error(f"Customer ID Cleaning Error: {e}")
        return customer_id

In [20]:

try:
    try:
        sales_data = pd.read_json(r'C:\Users\muham\Downloads\MidLevel_DataEngineer_Assessment\sales_data.json')
        customer_data = pd.read_json(r'C:\Users\muham\Downloads\MidLevel_DataEngineer_Assessment\customer_data.json')
    except Exception as e:
        log_error(f"File Read Error: {e}")
        raise

    try:
        customer_data.dropna(subset=['customer_id'], inplace=True)
        customer_data['join_date'] = pd.to_datetime(customer_data['join_date'], errors='coerce')
        customer_data['loyalty_points'] = customer_data['loyalty_points'].fillna(0)

        sales_data['date'] = pd.to_datetime(sales_data['date'], errors='coerce')
        sales_data['total_price'] = sales_data['quantity'] * sales_data['product'].apply(lambda p: p['price'])

        products_data = pd.DataFrame([
            {
                'product_id': p['id'],
                'product_name': p['name'],
                'product_category': p['category'],
                'price': p['price']
            }
            for p in sales_data['product']
        ]).drop_duplicates('product_id')

        customer_data['customer_name'] = customer_data['customer_name'].apply(clean_name)
        customer_data['email'] = customer_data['email'].apply(clean_email)
        customer_data['customer_id'] = customer_data['customer_id'].apply(clean_customer_id)

    except Exception as e:
        log_error(f"Data Transformation Error: {e}")
        raise

    try:
        conn = pyodbc.connect(
            'DRIVER={ODBC Driver 17 for SQL Server};'
            'SERVER=DATA_ENGINEER\\SQLEXPRESS;'
            'DATABASE=sale_db;'
            'Trusted_Connection=yes;'
        )
        cursor = conn.cursor()
    except Exception as e:
        log_error(f"Database Connection Error: {e}")
        raise

    try:
        create_table_queries = [
            '''
            IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Products' AND xtype='U')
            CREATE TABLE Products (
                product_id VARCHAR(50) PRIMARY KEY,
                product_name VARCHAR(255),
                product_category VARCHAR(255),
                price DECIMAL(10, 2)
            )
            ''',
            '''
            IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Transactions' AND xtype='U')
            CREATE TABLE Transactions (
                transaction_id VARCHAR(50) PRIMARY KEY,
                customer_id VARCHAR(50),
                product_id VARCHAR(50),
                quantity INT,
                discount DECIMAL(5,2),
                date DATETIME,
                region VARCHAR(50),
                total_price DECIMAL(10,2),
                FOREIGN KEY (product_id) REFERENCES Products(product_id)
            )
            ''',
            '''
            IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='Customers' AND xtype='U')
            CREATE TABLE Customers (
                customer_id VARCHAR(50) PRIMARY KEY,
                customer_name VARCHAR(1000),
                email VARCHAR(500),
                region VARCHAR(50),
                join_date DATETIME2,
                loyalty_points INT
            )
            '''
        ]

        for query in create_table_queries:
            cursor.execute(query)

        for _, p in products_data.iterrows():
            cursor.execute("SELECT COUNT(1) FROM Products WHERE product_id=?", p['product_id'])
            if cursor.fetchone()[0]:
                log_error(f"Duplicate product_id skipped: {p['product_id']}")
                continue
            cursor.execute(
                "INSERT INTO Products VALUES (?, ?, ?, ?)",
                p['product_id'], p['product_name'], p['product_category'], p['price']
            )

        for _, t in sales_data.iterrows():
            cursor.execute("SELECT COUNT(1) FROM Transactions WHERE transaction_id=?", t['transaction_id'])
            if cursor.fetchone()[0]:
                log_error(f"Duplicate transaction_id skipped: {t['transaction_id']}")
                continue
            cursor.execute(
                "INSERT INTO Transactions VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
                t['transaction_id'], t['customer_id'], t['product']['id'], t['quantity'],
                t['discount'], t['date'], t['region'], t['total_price']
            )

        for _, c in customer_data.iterrows():
            cursor.execute("SELECT COUNT(1) FROM Customers WHERE customer_id=?", c['customer_id'])
            if cursor.fetchone()[0]:
                log_error(f"Duplicate customer_id skipped: {c['customer_id']}")
                continue
            join_date = c['join_date'] if pd.notna(c['join_date']) else None
            cursor.execute(
                "INSERT INTO Customers VALUES (?, ?, ?, ?, ?, ?)",
                c['customer_id'], c['customer_name'], c['email'], c['region'], join_date, c['loyalty_points']
            )

        conn.commit()

    except Exception as e:
        log_error(f"Database Operation Error: {e}")
        raise
    finally:
        cursor.close()
        conn.close()

    print("ETL Process Completed Successfully!")

except Exception as e:
    log_error(f"General ETL Error: {e}")
    print("ETL Process Failed! Check logs for details.")


ETL Process Completed Successfully!
