# Week 6 - SQL and Python Integration Part 1: Database Connections

## Learning Objectives
By the end of this lesson, you will be able to:
1. Establish database connections from Python using SQLAlchemy
2. Load Olist e-commerce data into SQLite for SQL analysis
3. Execute basic SQL queries from Python notebooks
4. Understand the relationship between SQL databases and Python DataFrames
5. Implement proper connection management and error handling

## Business Context: Bridging SQL and Python

In modern business environments, data often lives in **SQL databases** while analysis happens in **Python**. The ability to seamlessly bridge these two worlds is essential for:

- **Data Extraction** - Pull specific datasets from enterprise databases
- **Performance** - Leverage database engines for heavy computation
- **Real-time Analysis** - Connect to live business systems
- **Scalability** - Handle datasets too large for memory
- **Integration** - Combine SQL's querying power with Python's analytical capabilities

Today we'll master the fundamental skills of connecting Python to databases and working with real business data using both SQL and pandas approaches.

In [None]:
# Import required libraries for database connectivity
import pandas as pd
import numpy as np
import sqlite3
import sqlalchemy
from sqlalchemy import create_engine, text, inspect
import zipfile
import io
import requests
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Database configuration
DATABASE_CONFIG = {
    'sqlite_path': 'olist_ecommerce.db',
    'connection_timeout': 30,
    'echo': False  # Set to True to see SQL queries
}

print("üóÑÔ∏è SQL-Python Integration Environment Ready!")
print(f"SQLAlchemy version: {sqlalchemy.__version__}")
print(f"Pandas version: {pd.__version__}")
print("Ready to bridge SQL and Python for business analytics!")

## 1. Loading Business Data into Database

First, let's load our Olist e-commerce data into a SQLite database. This simulates the common scenario where business data resides in a database system.

In [None]:
def load_olist_data_to_database():
    """
    Load Olist e-commerce data from zip file and store in SQLite database.
    This simulates real-world scenario where business data resides in databases.
    """
    try:
        # Download the Olist dataset
        print("üì• Downloading Olist e-commerce dataset...")
        zip_url = "https://github.com/autom8or-com/python-data-analysis-course/raw/main/Resources/data/sales.zip"
        response = requests.get(zip_url)
        response.raise_for_status()
        
        # Create database connection
        print("üóÑÔ∏è Creating SQLite database connection...")
        engine = create_engine(f"sqlite:///{DATABASE_CONFIG['sqlite_path']}", echo=DATABASE_CONFIG['echo'])
        
        datasets = {}
        table_info = {}
        
        with zipfile.ZipFile(io.BytesIO(response.content)) as zip_file:
            print("üìä Extracting and loading tables into database...")
            
            # Define table mapping with business-friendly names
            table_mapping = {
                'customers': 'olist_customers_dataset.csv',
                'orders': 'olist_orders_dataset.csv',
                'order_items': 'olist_order_items_dataset.csv',
                'products': 'olist_products_dataset.csv',
                'sellers': 'olist_sellers_dataset.csv',
                'payments': 'olist_order_payments_dataset.csv',
                'reviews': 'olist_order_reviews_dataset.csv',
                'geolocation': 'olist_geolocation_dataset.csv'
            }
            
            for table_name, filename in table_mapping.items():
                try:
                    # Load CSV data
                    df = pd.read_csv(zip_file.open(filename))
                    
                    # Data cleaning and optimization
                    df = clean_data_for_database(df, table_name)
                    
                    # Store in database
                    df.to_sql(table_name, engine, if_exists='replace', index=False, method='multi')
                    
                    # Track table information
                    table_info[table_name] = {
                        'rows': len(df),
                        'columns': list(df.columns),
                        'memory_usage': df.memory_usage().sum()
                    }
                    
                    datasets[table_name] = df
                    print(f"  ‚úì {table_name}: {len(df):,} rows, {len(df.columns)} columns")
                    
                except KeyError:
                    print(f"  ‚ö† {filename} not found in zip file")
                    continue
                except Exception as e:
                    print(f"  ‚ùå Error loading {table_name}: {e}")
                    continue
        
        # Create indexes for better query performance
        create_database_indexes(engine)
        
        print(f"\n‚úÖ Database setup complete!")
        print(f"üìÅ Database file: {DATABASE_CONFIG['sqlite_path']}")
        print(f"üìä Tables loaded: {len(table_info)}")
        
        return engine, datasets, table_info
        
    except Exception as e:
        print(f"‚ùå Error setting up database: {e}")
        return create_sample_database()

def clean_data_for_database(df, table_name):
    """
    Clean and optimize data for database storage.
    """
    # Convert date columns
    date_columns = [col for col in df.columns if 'date' in col.lower() or 'timestamp' in col.lower()]
    for col in date_columns:
        df[col] = pd.to_datetime(df[col], errors='coerce')
    
    # Handle missing values
    for col in df.columns:
        if df[col].dtype == 'object':
            df[col] = df[col].fillna('Unknown')
        else:
            df[col] = df[col].fillna(0)
    
    # Optimize data types for storage
    for col in df.columns:
        if df[col].dtype == 'float64':
            if df[col].max() < 3.4e38 and df[col].min() > -3.4e38:  # Float32 range
                df[col] = df[col].astype('float32')
        elif df[col].dtype == 'int64':
            if df[col].max() < 2147483647 and df[col].min() > -2147483648:  # Int32 range
                df[col] = df[col].astype('int32')
    
    return df

def create_database_indexes(engine):
    """
    Create indexes for better query performance on key business fields.
    """
    with engine.connect() as conn:
        # Primary business entity indexes
        indexes = [
            "CREATE INDEX IF NOT EXISTS idx_orders_customer_id ON orders(customer_id)",
            "CREATE INDEX IF NOT EXISTS idx_orders_purchase_date ON orders(order_purchase_timestamp)",
            "CREATE INDEX IF NOT EXISTS idx_orders_status ON orders(order_status)",
            "CREATE INDEX IF NOT EXISTS idx_order_items_order_id ON order_items(order_id)",
            "CREATE INDEX IF NOT EXISTS idx_order_items_product_id ON order_items(product_id)",
            "CREATE INDEX IF NOT EXISTS idx_customers_state ON customers(customer_state)",
            "CREATE INDEX IF NOT EXISTS idx_products_category ON products(product_category_name)",
            "CREATE INDEX IF NOT EXISTS idx_reviews_order_id ON reviews(order_id)",
            "CREATE INDEX IF NOT EXISTS idx_reviews_score ON reviews(review_score)"
        ]
        
        for index_sql in indexes:
            try:
                conn.execute(text(index_sql))
            except Exception as e:
                print(f"Warning: Could not create index: {e}")
        
        conn.commit()
    
    print("üóÉÔ∏è Database indexes created for optimal query performance")

def create_sample_database():
    """
    Create sample database if real data loading fails.
    """
    print("üìù Creating sample database for demonstration...")
    
    engine = create_engine(f"sqlite:///{DATABASE_CONFIG['sqlite_path']}", echo=DATABASE_CONFIG['echo'])
    
    # Generate realistic sample data
    np.random.seed(42)
    
    # Sample customers
    n_customers = 5000
    customers = pd.DataFrame({
        'customer_id': [f'cust_{i:06d}' for i in range(n_customers)],
        'customer_unique_id': [f'uniq_{i:06d}' for i in range(n_customers)],
        'customer_zip_code_prefix': np.random.randint(10000, 99999, n_customers),
        'customer_city': np.random.choice(['S√£o Paulo', 'Rio de Janeiro', 'Belo Horizonte', 
                                         'Porto Alegre', 'Curitiba', 'Salvador'], n_customers),
        'customer_state': np.random.choice(['SP', 'RJ', 'MG', 'RS', 'PR', 'BA'], n_customers)
    })
    
    # Sample orders with business patterns
    n_orders = 8000
    start_date = datetime(2017, 1, 1)
    end_date = datetime(2018, 8, 31)
    
    orders = pd.DataFrame({
        'order_id': [f'ord_{i:08d}' for i in range(n_orders)],
        'customer_id': np.random.choice(customers['customer_id'], n_orders),
        'order_status': np.random.choice(['delivered', 'shipped', 'processing', 'canceled'], 
                                       n_orders, p=[0.85, 0.08, 0.05, 0.02]),
        'order_purchase_timestamp': pd.date_range(start_date, end_date, periods=n_orders),
        'order_approved_at': pd.date_range(start_date, end_date, periods=n_orders),
        'order_delivered_carrier_date': pd.date_range(start_date + timedelta(days=2), 
                                                    end_date + timedelta(days=5), periods=n_orders),
        'order_delivered_customer_date': pd.date_range(start_date + timedelta(days=5), 
                                                     end_date + timedelta(days=10), periods=n_orders),
        'order_estimated_delivery_date': pd.date_range(start_date + timedelta(days=10), 
                                                      end_date + timedelta(days=15), periods=n_orders)
    })
    
    # Sample products
    categories = ['health_beauty', 'computers_accessories', 'furniture_decor', 'sports_leisure',
                 'housewares', 'watches_gifts', 'telephony', 'auto', 'toys', 'cool_stuff']
    
    n_products = 3000
    products = pd.DataFrame({
        'product_id': [f'prod_{i:06d}' for i in range(n_products)],
        'product_category_name': np.random.choice(categories, n_products),
        'product_name_lenght': np.random.randint(10, 100, n_products),
        'product_description_lenght': np.random.randint(50, 500, n_products),
        'product_photos_qty': np.random.randint(1, 10, n_products),
        'product_weight_g': np.random.lognormal(6, 1, n_products),
        'product_length_cm': np.random.lognormal(3, 0.5, n_products),
        'product_height_cm': np.random.lognormal(2.5, 0.5, n_products),
        'product_width_cm': np.random.lognormal(3, 0.5, n_products)
    })
    
    # Sample order items
    n_items = 12000
    order_items = pd.DataFrame({
        'order_id': np.random.choice(orders['order_id'], n_items),
        'order_item_id': np.random.randint(1, 5, n_items),
        'product_id': np.random.choice(products['product_id'], n_items),
        'seller_id': [f'sell_{i:04d}' for i in np.random.randint(0, 500, n_items)],
        'shipping_limit_date': pd.date_range(start_date, end_date, periods=n_items),
        'price': np.random.lognormal(3.5, 0.8, n_items),
        'freight_value': np.random.exponential(15, n_items)
    })
    
    # Sample reviews
    n_reviews = int(n_orders * 0.8)
    reviews = pd.DataFrame({
        'review_id': [f'rev_{i:08d}' for i in range(n_reviews)],
        'order_id': np.random.choice(orders['order_id'], n_reviews),
        'review_score': np.random.choice([1, 2, 3, 4, 5], n_reviews, p=[0.05, 0.08, 0.15, 0.32, 0.40]),
        'review_comment_title': ['Sample Title'] * n_reviews,
        'review_comment_message': ['Sample Comment'] * n_reviews,
        'review_creation_date': pd.date_range(start_date, end_date, periods=n_reviews),
        'review_answer_timestamp': pd.date_range(start_date, end_date, periods=n_reviews)
    })
    
    # Store all tables in database
    datasets = {
        'customers': customers,
        'orders': orders,
        'products': products,
        'order_items': order_items,
        'reviews': reviews
    }
    
    table_info = {}
    
    for table_name, df in datasets.items():
        df.to_sql(table_name, engine, if_exists='replace', index=False)
        table_info[table_name] = {
            'rows': len(df),
            'columns': list(df.columns),
            'memory_usage': df.memory_usage().sum()
        }
        print(f"  ‚úì {table_name}: {len(df):,} rows")
    
    create_database_indexes(engine)
    
    return engine, datasets, table_info

# Load data into database
print("üöÄ Setting up Olist E-commerce Database...")
db_engine, raw_datasets, table_info = load_olist_data_to_database()

print(f"\nüìã Database Summary:")
for table, info in table_info.items():
    print(f"  üìä {table}: {info['rows']:,} rows, {len(info['columns'])} columns")

## 2. Database Connection Management

Proper connection management is crucial for production applications. Let's explore different ways to connect to and interact with our database.

In [None]:
class DatabaseManager:
    """
    Professional database connection manager with proper error handling,
    connection pooling, and resource management.
    """
    
    def __init__(self, database_url=None):
        self.database_url = database_url or f"sqlite:///{DATABASE_CONFIG['sqlite_path']}"
        self.engine = None
        self._connect()
    
    def _connect(self):
        """
        Establish database connection with proper configuration.
        """
        try:
            self.engine = create_engine(
                self.database_url,
                echo=DATABASE_CONFIG['echo'],
                pool_timeout=DATABASE_CONFIG['connection_timeout'],
                pool_recycle=3600  # Recycle connections every hour
            )
            
            # Test connection
            with self.engine.connect() as conn:
                conn.execute(text("SELECT 1"))
            
            print("‚úÖ Database connection established successfully")
            
        except Exception as e:
            print(f"‚ùå Database connection failed: {e}")
            raise
    
    def get_table_info(self):
        """
        Get comprehensive information about all tables in the database.
        """
        inspector = inspect(self.engine)
        tables = inspector.get_table_names()
        
        table_info = {}
        
        for table in tables:
            with self.engine.connect() as conn:
                # Get row count
                result = conn.execute(text(f"SELECT COUNT(*) FROM {table}"))
                row_count = result.scalar()
                
                # Get column information
                columns = inspector.get_columns(table)
                
                table_info[table] = {
                    'rows': row_count,
                    'columns': [col['name'] for col in columns],
                    'column_types': {col['name']: str(col['type']) for col in columns}
                }
        
        return table_info
    
    def execute_query(self, query, params=None):
        """
        Execute a SQL query with proper error handling and return results as DataFrame.
        """
        try:
            with self.engine.connect() as conn:
                if params:
                    result = pd.read_sql(text(query), conn, params=params)
                else:
                    result = pd.read_sql(text(query), conn)
                
                return result
                
        except Exception as e:
            print(f"‚ùå Query execution failed: {e}")
            print(f"üìù Query: {query[:100]}...")
            raise
    
    def get_sample_data(self, table_name, limit=5):
        """
        Get sample data from a table for exploration.
        """
        query = f"SELECT * FROM {table_name} LIMIT {limit}"
        return self.execute_query(query)
    
    def get_table_schema(self, table_name):
        """
        Get detailed schema information for a specific table.
        """
        inspector = inspect(self.engine)
        columns = inspector.get_columns(table_name)
        
        schema_df = pd.DataFrame([
            {
                'column_name': col['name'],
                'data_type': str(col['type']),
                'nullable': col['nullable'],
                'default': col.get('default'),
                'primary_key': col.get('primary_key', False)
            }
            for col in columns
        ])
        
        return schema_df
    
    def close(self):
        """
        Properly close database connections.
        """
        if self.engine:
            self.engine.dispose()
            print("üîí Database connections closed")

# Create database manager instance
db = DatabaseManager()

# Display database information
print("\nüìä Database Overview:")
db_info = db.get_table_info()

for table_name, info in db_info.items():
    print(f"\nüóÉÔ∏è Table: {table_name}")
    print(f"   üìè Rows: {info['rows']:,}")
    print(f"   üìã Columns: {len(info['columns'])}")
    print(f"   üè∑Ô∏è Key columns: {', '.join(info['columns'][:5])}{'...' if len(info['columns']) > 5 else ''}")

## 3. Basic SQL Queries from Python

Now let's execute SQL queries directly from Python and see how they compare to pandas operations.

In [None]:
# Let's start with basic data exploration using SQL
print("üîç Data Exploration: SQL vs Pandas Comparison")
print("\n" + "="*60)

# Example 1: Basic SELECT statement
print("\nüìã Example 1: Basic Data Selection")
print("SQL Approach:")

# SQL query
sql_customers = db.execute_query("""
    SELECT customer_id, customer_city, customer_state 
    FROM customers 
    LIMIT 5
""")

print("SQL Result:")
display(sql_customers)

print("\nPandas Equivalent:")
pandas_customers = raw_datasets['customers'][['customer_id', 'customer_city', 'customer_state']].head(5)
display(pandas_customers)

print("\n" + "-"*60)

In [None]:
# Example 2: Filtering and counting
print("\nüìä Example 2: Filtering and Aggregation")
print("Business Question: How many orders by status?")

print("\nSQL Approach:")
sql_order_status = db.execute_query("""
    SELECT 
        order_status,
        COUNT(*) as order_count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
    FROM orders
    GROUP BY order_status
    ORDER BY order_count DESC
""")

print("SQL Result:")
display(sql_order_status)

print("\nPandas Equivalent:")
pandas_order_status = raw_datasets['orders']['order_status'].value_counts().reset_index()
pandas_order_status.columns = ['order_status', 'order_count']
pandas_order_status['percentage'] = (pandas_order_status['order_count'] / pandas_order_status['order_count'].sum() * 100).round(2)
display(pandas_order_status)

print("\nüí° Observation: SQL provides more concise syntax for window functions!")
print("\n" + "-"*60)

In [None]:
# Example 3: Complex business analysis with joins
print("\nüíº Example 3: Business Analysis with Multiple Tables")
print("Business Question: Revenue by product category")

print("\nSQL Approach (with JOINs):")
sql_category_revenue = db.execute_query("""
    SELECT 
        p.product_category_name,
        COUNT(DISTINCT oi.order_id) as total_orders,
        SUM(oi.price) as total_revenue,
        ROUND(AVG(oi.price), 2) as avg_item_price,
        SUM(oi.freight_value) as total_freight
    FROM order_items oi
    JOIN products p ON oi.product_id = p.product_id
    GROUP BY p.product_category_name
    ORDER BY total_revenue DESC
    LIMIT 10
""")

print("SQL Result:")
display(sql_category_revenue)

print("\nPandas Equivalent (with merge):")
pandas_category_revenue = (
    raw_datasets['order_items']
    .merge(raw_datasets['products'], on='product_id')
    .groupby('product_category_name')
    .agg({
        'order_id': 'nunique',
        'price': ['sum', 'mean'],
        'freight_value': 'sum'
    })
    .round(2)
)

# Flatten column names
pandas_category_revenue.columns = ['total_orders', 'total_revenue', 'avg_item_price', 'total_freight']
pandas_category_revenue = pandas_category_revenue.sort_values('total_revenue', ascending=False).head(10).reset_index()

display(pandas_category_revenue)

print("\nüí° Observation: SQL JOINs are often more readable than pandas merges!")
print("\n" + "-"*60)

## 4. Advanced SQL Queries for Business Intelligence

Let's explore more sophisticated SQL queries that are common in business intelligence scenarios.

In [None]:
# Business Intelligence Query 1: Customer Lifetime Value Analysis
print("üí∞ Business Intelligence Analysis 1: Customer Lifetime Value")
print("\nSQL Query: Customer spending patterns with statistical analysis")

customer_ltv_sql = """
WITH customer_metrics AS (
    SELECT 
        o.customer_id,
        c.customer_state,
        COUNT(DISTINCT o.order_id) as total_orders,
        SUM(oi.price + oi.freight_value) as total_spent,
        AVG(oi.price + oi.freight_value) as avg_order_value,
        MIN(o.order_purchase_timestamp) as first_order_date,
        MAX(o.order_purchase_timestamp) as last_order_date,
        JULIANDAY(MAX(o.order_purchase_timestamp)) - JULIANDAY(MIN(o.order_purchase_timestamp)) as customer_lifetime_days
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id
    JOIN order_items oi ON o.order_id = oi.order_id
    WHERE o.order_status = 'delivered'
    GROUP BY o.customer_id, c.customer_state
),
customer_segments AS (
    SELECT *,
        CASE 
            WHEN total_spent >= 500 THEN 'High Value'
            WHEN total_spent >= 200 THEN 'Medium Value'
            ELSE 'Low Value'
        END as value_segment,
        CASE 
            WHEN total_orders >= 3 THEN 'Frequent'
            WHEN total_orders >= 2 THEN 'Regular'
            ELSE 'One-time'
        END as frequency_segment
    FROM customer_metrics
)
SELECT 
    customer_state,
    value_segment,
    frequency_segment,
    COUNT(*) as customer_count,
    ROUND(AVG(total_spent), 2) as avg_customer_value,
    ROUND(AVG(total_orders), 2) as avg_orders_per_customer,
    ROUND(AVG(customer_lifetime_days), 1) as avg_lifetime_days
FROM customer_segments
GROUP BY customer_state, value_segment, frequency_segment
ORDER BY customer_state, avg_customer_value DESC
"""

customer_ltv_results = db.execute_query(customer_ltv_sql)
print("\nüìä Customer Lifetime Value Analysis Results:")
display(customer_ltv_results.head(15))

print(f"\nüìà Key Insights:")
total_customers = customer_ltv_results['customer_count'].sum()
high_value_customers = customer_ltv_results[customer_ltv_results['value_segment'] == 'High Value']['customer_count'].sum()
print(f"  ‚Ä¢ Total customer segments analyzed: {total_customers:,}")
print(f"  ‚Ä¢ High-value customers: {high_value_customers:,} ({high_value_customers/total_customers*100:.1f}%)")
print(f"  ‚Ä¢ Average customer value across all segments: ${customer_ltv_results['avg_customer_value'].mean():.2f}")

In [None]:
# Business Intelligence Query 2: Sales Performance Trends
print("\nüìà Business Intelligence Analysis 2: Sales Performance Trends")
print("\nSQL Query: Monthly sales trends with growth calculations")

sales_trends_sql = """
WITH monthly_sales AS (
    SELECT 
        strftime('%Y', o.order_purchase_timestamp) as year,
        strftime('%m', o.order_purchase_timestamp) as month,
        COUNT(DISTINCT o.order_id) as total_orders,
        COUNT(DISTINCT o.customer_id) as unique_customers,
        SUM(oi.price) as total_revenue,
        SUM(oi.freight_value) as total_freight,
        ROUND(AVG(oi.price), 2) as avg_order_value
    FROM orders o
    JOIN order_items oi ON o.order_id = oi.order_id
    WHERE o.order_status = 'delivered'
        AND o.order_purchase_timestamp IS NOT NULL
    GROUP BY strftime('%Y', o.order_purchase_timestamp), strftime('%m', o.order_purchase_timestamp)
),
sales_with_growth AS (
    SELECT *,
        total_revenue + total_freight as total_value,
        LAG(total_revenue, 1) OVER (ORDER BY year, month) as prev_month_revenue,
        LAG(total_orders, 1) OVER (ORDER BY year, month) as prev_month_orders
    FROM monthly_sales
)
SELECT 
    year || '-' || printf('%02d', month) as period,
    total_orders,
    unique_customers,
    ROUND(total_revenue, 2) as revenue,
    ROUND(total_value, 2) as total_value,
    avg_order_value,
    CASE 
        WHEN prev_month_revenue IS NOT NULL THEN 
            ROUND((total_revenue - prev_month_revenue) / prev_month_revenue * 100, 2)
        ELSE NULL 
    END as revenue_growth_pct,
    CASE 
        WHEN prev_month_orders IS NOT NULL THEN 
            ROUND((total_orders - prev_month_orders) / CAST(prev_month_orders AS FLOAT) * 100, 2)
        ELSE NULL 
    END as order_growth_pct
FROM sales_with_growth
ORDER BY year, month
"""

sales_trends_results = db.execute_query(sales_trends_sql)
print("\nüìä Monthly Sales Performance Trends:")
display(sales_trends_results)

# Calculate key performance metrics
avg_growth = sales_trends_results['revenue_growth_pct'].dropna().mean()
best_month = sales_trends_results.loc[sales_trends_results['revenue'].idxmax()]
total_revenue = sales_trends_results['revenue'].sum()

print(f"\nüìà Performance Summary:")
print(f"  ‚Ä¢ Total revenue across all periods: ${total_revenue:,.2f}")
print(f"  ‚Ä¢ Average monthly revenue growth: {avg_growth:.2f}%")
print(f"  ‚Ä¢ Best performing month: {best_month['period']} (${best_month['revenue']:,.2f})")
print(f"  ‚Ä¢ Total unique customers served: {sales_trends_results['unique_customers'].sum():,}")

In [None]:
# Business Intelligence Query 3: Product Performance Analysis
print("\nüõçÔ∏è Business Intelligence Analysis 3: Product Performance Matrix")
print("\nSQL Query: Product category performance with customer satisfaction metrics")

product_performance_sql = """
WITH product_sales AS (
    SELECT 
        p.product_category_name,
        COUNT(DISTINCT oi.order_id) as total_orders,
        COUNT(DISTINCT o.customer_id) as unique_customers,
        SUM(oi.price) as total_revenue,
        AVG(oi.price) as avg_item_price,
        SUM(oi.freight_value) as total_freight,
        AVG(oi.freight_value) as avg_freight_cost
    FROM order_items oi
    JOIN products p ON oi.product_id = p.product_id
    JOIN orders o ON oi.order_id = o.order_id
    WHERE o.order_status = 'delivered'
    GROUP BY p.product_category_name
),
product_reviews AS (
    SELECT 
        p.product_category_name,
        AVG(CAST(r.review_score AS FLOAT)) as avg_rating,
        COUNT(r.review_id) as total_reviews,
        SUM(CASE WHEN r.review_score >= 4 THEN 1 ELSE 0 END) * 100.0 / COUNT(r.review_id) as satisfaction_rate
    FROM reviews r
    JOIN orders o ON r.order_id = o.order_id
    JOIN order_items oi ON o.order_id = oi.order_id
    JOIN products p ON oi.product_id = p.product_id
    GROUP BY p.product_category_name
),
category_performance AS (
    SELECT 
        ps.*,
        COALESCE(pr.avg_rating, 0) as avg_rating,
        COALESCE(pr.total_reviews, 0) as total_reviews,
        COALESCE(pr.satisfaction_rate, 0) as satisfaction_rate,
        (ps.total_revenue / ps.total_orders) as revenue_per_order,
        (ps.total_revenue / ps.unique_customers) as revenue_per_customer
    FROM product_sales ps
    LEFT JOIN product_reviews pr ON ps.product_category_name = pr.product_category_name
)
SELECT 
    product_category_name,
    total_orders,
    unique_customers,
    ROUND(total_revenue, 2) as total_revenue,
    ROUND(avg_item_price, 2) as avg_item_price,
    ROUND(avg_freight_cost, 2) as avg_freight_cost,
    ROUND(avg_rating, 2) as avg_rating,
    total_reviews,
    ROUND(satisfaction_rate, 1) as satisfaction_rate,
    ROUND(revenue_per_order, 2) as revenue_per_order,
    ROUND(revenue_per_customer, 2) as revenue_per_customer,
    CASE 
        WHEN total_revenue >= 50000 AND avg_rating >= 4.0 THEN 'Star Performer'
        WHEN total_revenue >= 50000 AND avg_rating < 4.0 THEN 'High Volume, Low Satisfaction'
        WHEN total_revenue < 50000 AND avg_rating >= 4.0 THEN 'High Satisfaction, Low Volume'
        ELSE 'Needs Attention'
    END as performance_category
FROM category_performance
ORDER BY total_revenue DESC
"""

product_performance_results = db.execute_query(product_performance_sql)
print("\nüìä Product Category Performance Matrix:")
display(product_performance_results)

# Analyze performance categories
performance_summary = product_performance_results['performance_category'].value_counts()
print(f"\nüéØ Performance Category Distribution:")
for category, count in performance_summary.items():
    print(f"  ‚Ä¢ {category}: {count} categories")

# Top performers
top_revenue = product_performance_results.nlargest(3, 'total_revenue')
top_satisfaction = product_performance_results.nlargest(3, 'avg_rating')

print(f"\nüèÜ Top 3 Categories by Revenue:")
for _, row in top_revenue.iterrows():
    print(f"  ‚Ä¢ {row['product_category_name']}: ${row['total_revenue']:,.2f} (Rating: {row['avg_rating']:.2f})")

print(f"\n‚≠ê Top 3 Categories by Customer Satisfaction:")
for _, row in top_satisfaction.iterrows():
    print(f"  ‚Ä¢ {row['product_category_name']}: {row['avg_rating']:.2f} rating (Revenue: ${row['total_revenue']:,.2f})")

## 5. SQL vs Pandas: Performance and Use Case Comparison

Let's compare the performance and use cases of SQL versus pandas for different types of operations.

In [None]:
import time

def compare_sql_vs_pandas():
    """
    Compare performance and readability of SQL vs pandas operations.
    """
    print("‚ö° SQL vs Pandas Performance Comparison")
    print("\n" + "="*60)
    
    # Test 1: Simple aggregation
    print("\nüìä Test 1: Simple Aggregation (Order counts by state)")
    
    # SQL approach
    start_time = time.time()
    sql_result = db.execute_query("""
        SELECT c.customer_state, COUNT(o.order_id) as order_count
        FROM orders o
        JOIN customers c ON o.customer_id = c.customer_id
        GROUP BY c.customer_state
        ORDER BY order_count DESC
    """)
    sql_time = time.time() - start_time
    
    # Pandas approach
    start_time = time.time()
    pandas_result = (
        raw_datasets['orders']
        .merge(raw_datasets['customers'], on='customer_id')
        .groupby('customer_state')['order_id']
        .count()
        .sort_values(ascending=False)
        .reset_index()
    )
    pandas_result.columns = ['customer_state', 'order_count']
    pandas_time = time.time() - start_time
    
    print(f"  SQL Time: {sql_time:.4f} seconds")
    print(f"  Pandas Time: {pandas_time:.4f} seconds")
    print(f"  Winner: {'SQL' if sql_time < pandas_time else 'Pandas'} ({min(sql_time, pandas_time)/max(sql_time, pandas_time)*100:.1f}% faster)")
    
    # Test 2: Complex multi-table join with calculations
    print("\nüíº Test 2: Complex Business Query (Revenue by category with metrics)")
    
    # SQL approach
    start_time = time.time()
    sql_complex = db.execute_query("""
        SELECT 
            p.product_category_name,
            COUNT(DISTINCT o.order_id) as orders,
            SUM(oi.price + oi.freight_value) as total_value,
            AVG(oi.price + oi.freight_value) as avg_order_value,
            COUNT(DISTINCT o.customer_id) as unique_customers
        FROM orders o
        JOIN order_items oi ON o.order_id = oi.order_id
        JOIN products p ON oi.product_id = p.product_id
        WHERE o.order_status = 'delivered'
        GROUP BY p.product_category_name
        ORDER BY total_value DESC
        LIMIT 10
    """)
    sql_complex_time = time.time() - start_time
    
    # Pandas approach
    start_time = time.time()
    pandas_complex = (
        raw_datasets['orders']
        [raw_datasets['orders']['order_status'] == 'delivered']
        .merge(raw_datasets['order_items'], on='order_id')
        .merge(raw_datasets['products'], on='product_id')
    )
    pandas_complex['total_item_value'] = pandas_complex['price'] + pandas_complex['freight_value']
    
    pandas_complex_result = (
        pandas_complex
        .groupby('product_category_name')
        .agg({
            'order_id': ['nunique', 'count'],
            'total_item_value': ['sum', 'mean'],
            'customer_id': 'nunique'
        })
    )
    pandas_complex_result.columns = ['orders', 'items', 'total_value', 'avg_order_value', 'unique_customers']
    pandas_complex_result = pandas_complex_result.sort_values('total_value', ascending=False).head(10)
    pandas_complex_time = time.time() - start_time
    
    print(f"  SQL Time: {sql_complex_time:.4f} seconds")
    print(f"  Pandas Time: {pandas_complex_time:.4f} seconds")
    print(f"  Winner: {'SQL' if sql_complex_time < pandas_complex_time else 'Pandas'} ({min(sql_complex_time, pandas_complex_time)/max(sql_complex_time, pandas_complex_time)*100:.1f}% faster)")
    
    # Test 3: Window functions and advanced analytics
    print("\nüìà Test 3: Advanced Analytics (Running totals and rankings)")
    
    # SQL approach (much more concise for window functions)
    start_time = time.time()
    sql_window = db.execute_query("""
        WITH monthly_revenue AS (
            SELECT 
                strftime('%Y-%m', o.order_purchase_timestamp) as month,
                SUM(oi.price) as monthly_revenue
            FROM orders o
            JOIN order_items oi ON o.order_id = oi.order_id
            WHERE o.order_status = 'delivered'
            GROUP BY strftime('%Y-%m', o.order_purchase_timestamp)
        )
        SELECT 
            month,
            monthly_revenue,
            SUM(monthly_revenue) OVER (ORDER BY month) as running_total,
            LAG(monthly_revenue, 1) OVER (ORDER BY month) as prev_month,
            RANK() OVER (ORDER BY monthly_revenue DESC) as revenue_rank
        FROM monthly_revenue
        ORDER BY month
    """)
    sql_window_time = time.time() - start_time
    
    # Pandas approach (more verbose for window functions)
    start_time = time.time()
    pandas_window_data = (
        raw_datasets['orders']
        [raw_datasets['orders']['order_status'] == 'delivered']
        .merge(raw_datasets['order_items'], on='order_id')
    )
    pandas_window_data['order_purchase_timestamp'] = pd.to_datetime(pandas_window_data['order_purchase_timestamp'])
    pandas_window_data['month'] = pandas_window_data['order_purchase_timestamp'].dt.to_period('M')
    
    monthly_pandas = pandas_window_data.groupby('month')['price'].sum().reset_index()
    monthly_pandas.columns = ['month', 'monthly_revenue']
    monthly_pandas['month'] = monthly_pandas['month'].astype(str)
    monthly_pandas = monthly_pandas.sort_values('month')
    
    monthly_pandas['running_total'] = monthly_pandas['monthly_revenue'].cumsum()
    monthly_pandas['prev_month'] = monthly_pandas['monthly_revenue'].shift(1)
    monthly_pandas['revenue_rank'] = monthly_pandas['monthly_revenue'].rank(ascending=False, method='min')
    pandas_window_time = time.time() - start_time
    
    print(f"  SQL Time: {sql_window_time:.4f} seconds")
    print(f"  Pandas Time: {pandas_window_time:.4f} seconds")
    print(f"  Winner: {'SQL' if sql_window_time < pandas_window_time else 'Pandas'} ({min(sql_window_time, pandas_window_time)/max(sql_window_time, pandas_window_time)*100:.1f}% faster)")
    
    print("\nüìã Sample Results from Advanced Analytics:")
    display(sql_window.head(8))
    
    return {
        'simple_agg': {'sql': sql_time, 'pandas': pandas_time},
        'complex_join': {'sql': sql_complex_time, 'pandas': pandas_complex_time},
        'window_functions': {'sql': sql_window_time, 'pandas': pandas_window_time}
    }

# Run performance comparison
performance_results = compare_sql_vs_pandas()

print("\nüéØ Performance Summary:")
for test_name, times in performance_results.items():
    faster = 'SQL' if times['sql'] < times['pandas'] else 'Pandas'
    ratio = min(times['sql'], times['pandas']) / max(times['sql'], times['pandas'])
    print(f"  ‚Ä¢ {test_name.replace('_', ' ').title()}: {faster} wins ({ratio*100:.1f}% faster)")

print("\nüí° Key Takeaways:")
print("  ‚Ä¢ SQL excels at: Complex joins, window functions, set operations")
print("  ‚Ä¢ Pandas excels at: Data cleaning, statistical analysis, visualization prep")
print("  ‚Ä¢ Best practice: Use SQL for data extraction, pandas for analysis")
print("  ‚Ä¢ Performance varies by dataset size and query complexity")

## 6. Error Handling and Best Practices

Production database applications require robust error handling and connection management.

In [None]:
def demonstrate_error_handling():
    """
    Demonstrate proper error handling techniques for database operations.
    """
    print("üõ°Ô∏è Database Error Handling and Best Practices")
    print("\n" + "="*60)
    
    # Example 1: Handling SQL syntax errors
    print("\n‚ùå Example 1: SQL Syntax Error Handling")
    try:
        # Intentional syntax error
        result = db.execute_query("""
            SELCT * FROM orders  -- Missing 'E' in SELECT
            WHERE order_status = 'delivered'
        """)
    except Exception as e:
        print(f"‚úÖ Caught SQL syntax error: {type(e).__name__}")
        print(f"   Error message: {str(e)[:100]}...")
    
    # Example 2: Handling table/column not found
    print("\nüîç Example 2: Table/Column Not Found Error")
    try:
        result = db.execute_query("""
            SELECT customer_id, nonexistent_column 
            FROM customers
        """)
    except Exception as e:
        print(f"‚úÖ Caught column error: {type(e).__name__}")
        print(f"   Error message: {str(e)[:100]}...")
    
    # Example 3: Parameterized queries (SQL injection prevention)
    print("\nüîí Example 3: Safe Parameterized Queries")
    
    def safe_customer_lookup(customer_state):
        """
        Safely query customers by state using parameterized queries.
        """
        try:
            query = """
                SELECT customer_id, customer_city, customer_state
                FROM customers 
                WHERE customer_state = :state
                LIMIT 5
            """
            result = db.execute_query(query, params={'state': customer_state})
            return result
        except Exception as e:
            print(f"‚ùå Query failed: {e}")
            return pd.DataFrame()
    
    # Test safe query
    safe_result = safe_customer_lookup('SP')
    print(f"‚úÖ Safe query returned {len(safe_result)} customers from SP state")
    
    # Example 4: Connection management with context managers
    print("\nüîå Example 4: Proper Connection Management")
    
    class SafeDatabaseQuery:
        """
        Context manager for safe database operations.
        """
        def __init__(self, engine):
            self.engine = engine
            self.connection = None
        
        def __enter__(self):
            self.connection = self.engine.connect()
            return self.connection
        
        def __exit__(self, exc_type, exc_val, exc_tb):
            if self.connection:
                self.connection.close()
            if exc_type:
                print(f"‚ùå Database error occurred: {exc_type.__name__}: {exc_val}")
            return False  # Don't suppress exceptions
    
    # Use context manager for safe operations
    try:
        with SafeDatabaseQuery(db.engine) as conn:
            result = pd.read_sql(
                text("SELECT COUNT(*) as total_orders FROM orders"), 
                conn
            )
            print(f"‚úÖ Context manager query successful: {result.iloc[0, 0]:,} total orders")
    except Exception as e:
        print(f"‚ùå Context manager caught error: {e}")
    
    # Example 5: Data validation
    print("\n‚úÖ Example 5: Data Validation Best Practices")
    
    def validate_query_result(df, expected_columns=None, min_rows=0):
        """
        Validate query results meet business requirements.
        """
        validations = []
        
        # Check if DataFrame is empty
        if df.empty:
            validations.append("‚ùå Query returned no data")
        else:
            validations.append(f"‚úÖ Query returned {len(df):,} rows")
        
        # Check minimum row count
        if len(df) < min_rows:
            validations.append(f"‚ö†Ô∏è Row count ({len(df)}) below minimum ({min_rows})")
        
        # Check expected columns
        if expected_columns:
            missing_cols = set(expected_columns) - set(df.columns)
            if missing_cols:
                validations.append(f"‚ùå Missing columns: {missing_cols}")
            else:
                validations.append("‚úÖ All expected columns present")
        
        # Check for null values in key columns
        null_counts = df.isnull().sum()
        if null_counts.any():
            validations.append(f"‚ö†Ô∏è Null values found: {null_counts[null_counts > 0].to_dict()}")
        else:
            validations.append("‚úÖ No null values detected")
        
        return validations
    
    # Test validation
    test_query = db.execute_query("""
        SELECT customer_id, customer_state, customer_city
        FROM customers
        LIMIT 100
    """)
    
    validations = validate_query_result(
        test_query, 
        expected_columns=['customer_id', 'customer_state', 'customer_city'],
        min_rows=50
    )
    
    print("Query validation results:")
    for validation in validations:
        print(f"  {validation}")
    
    return validations

# Run error handling demonstration
error_handling_results = demonstrate_error_handling()

print("\nüìö Database Best Practices Summary:")
print("  üîí Always use parameterized queries to prevent SQL injection")
print("  üõ°Ô∏è Implement comprehensive error handling for all database operations")
print("  üîå Use connection context managers to ensure proper resource cleanup")
print("  ‚úÖ Validate query results before processing in business logic")
print("  üìä Log query performance for optimization opportunities")
print("  üîÑ Implement retry logic for transient connection issues")
print("  üìù Document query patterns and business logic for team maintenance")

## 7. Key Takeaways and Next Steps

### What We've Accomplished:

1. **Database Setup and Connection Management**
   - Loaded real business data into SQLite database
   - Established professional connection patterns
   - Implemented proper resource management

2. **SQL vs Pandas Comparison**
   - Basic queries and aggregations
   - Complex business intelligence analysis
   - Performance trade-offs and use cases

3. **Advanced SQL Techniques**
   - Window functions for business analytics
   - Common Table Expressions (CTEs)
   - Complex joins and subqueries

4. **Production-Ready Practices**
   - Error handling and validation
   - Parameterized queries for security
   - Connection pooling and resource management

### Business Value:

- **Performance**: Leverage database engines for heavy computation
- **Scalability**: Handle datasets larger than memory
- **Integration**: Connect to existing business systems
- **Security**: Proper authentication and query sanitization
- **Maintenance**: Centralized business logic in database layer

### When to Use SQL vs Pandas:

**Use SQL for:**
- Data extraction and filtering
- Complex joins across multiple tables
- Window functions and analytical queries
- Set operations (UNION, INTERSECT, EXCEPT)
- Large dataset aggregations

**Use Pandas for:**
- Data cleaning and transformation
- Statistical analysis and modeling
- Visualization preparation
- Iterative data exploration
- Machine learning feature engineering

### Next Session Preview:
In Part 2, we'll explore:
- Advanced SQL query patterns for business intelligence
- Real-time data integration techniques
- Combining SQL results with pandas analysis
- Building automated reporting pipelines

**üéâ You now have the fundamental skills to bridge SQL databases with Python analytics for powerful business intelligence solutions!**

## 8. Practice Exercise

**Your Challenge! üöÄ**

**Business Scenario**: The Olist marketing team wants to understand customer behavior patterns to improve their email marketing campaigns.

**Your Task**: Create a SQL query that identifies:
1. Customers who have made multiple purchases
2. Their average time between orders
3. Their preferred product categories
4. Their average order value trend over time

**Requirements**:
- Use proper JOINs to combine multiple tables
- Include error handling for your query
- Validate your results
- Compare with an equivalent pandas approach

**Deliverable**: A customer segmentation analysis that the marketing team can use to personalize their campaigns.

In [None]:
# Your practice exercise solution here

def analyze_customer_behavior_patterns():
    """
    Your challenge: Create a comprehensive customer behavior analysis using SQL.
    
    Business Goal: Help marketing team understand customer patterns for better campaigns.
    
    Requirements:
    1. Multi-purchase customers analysis
    2. Average time between orders
    3. Preferred product categories
    4. Order value trends
    """
    
    # Step 1: Write your SQL query
    customer_behavior_sql = """
    -- Your SQL query here
    -- Think about:
    -- - Which tables need to be joined?
    -- - How to calculate time between orders?
    -- - How to identify preferred categories?
    -- - How to track value trends?
    """
    
    # Step 2: Execute with error handling
    try:
        # Your query execution here
        pass
    except Exception as e:
        print(f"Query failed: {e}")
        return None
    
    # Step 3: Validate results
    # Your validation logic here
    
    # Step 4: Compare with pandas approach
    # Your pandas equivalent here
    
    # Step 5: Generate marketing insights
    # Your business insights here
    
    return None  # Return your results

print("üéØ Customer Behavior Analysis Challenge")
print("\nüìß Marketing Use Case:")
print("Help the marketing team understand customer patterns for email campaigns")
print("\nüîç Analysis Requirements:")
print("  1. Multi-purchase customer identification")
print("  2. Purchase frequency patterns")
print("  3. Category preferences")
print("  4. Value trend analysis")
print("\nüí° Think about:")
print("  ‚Ä¢ Which customers are most valuable for retention campaigns?")
print("  ‚Ä¢ What product recommendations would be most effective?")
print("  ‚Ä¢ How can we identify customers at risk of churning?")

# Uncomment to run your solution:
# customer_analysis_results = analyze_customer_behavior_patterns()

# Clean up database connection
db.close()