# Customer Return Clustering Feature Engineering

This notebook creates comprehensive customer features for DBSCAN clustering analysis.
Designed to handle large datasets (7M+ rows, 25GB+) with efficient DuckDB processing.

In [36]:
import pandas as pd
import duckdb
import numpy as np
from pathlib import Path
import logging
from typing import Union, Optional
import gc
from datetime import datetime

class PrependFileHandler(logging.Handler):
    def __init__(self, filename):
        super().__init__()
        self.filename = filename

    def emit(self, record):
        log_entry = self.format(record)
        try:
            with open(self.filename, 'r', encoding='utf-8') as f:
                old_content = f.read()
        except FileNotFoundError:
            old_content = ''
        with open(self.filename, 'w', encoding='utf-8') as f:
            f.write(log_entry + '\n' + old_content)

# Custom formatter for hh:mm:dd::mm:yy
class CustomFormatter(logging.Formatter):
    def formatTime(self, record, datefmt=None):
        ct = self.converter(record.created)
        return f"{ct.tm_hour:02}:{ct.tm_min:02}:{ct.tm_mday:02}::{ct.tm_mon:02}:{ct.tm_year % 100:02}"

# Configure logging
log_file = 'log.txt'
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Remove all handlers
logger.handlers = []
prepend_handler = PrependFileHandler(log_file)
formatter = CustomFormatter('%(asctime)s - %(levelname)s - %(message)s')
prepend_handler.setFormatter(formatter)
logger.addHandler(prepend_handler)

In [37]:
def create_customer_clustering_features(
    file_path: "data\base_returns_sku_metadata.csv",
    table_name: str = 'customer_data',
    features_table_name: str = 'customer_features',
    chunk_size: int = 100000,
    db_file: str = 'returns_v2.duckdb',
    force_recreate: bool = False
) -> duckdb.DuckDBPyConnection:
    """
    Create comprehensive customer clustering features from sales/returns data.
    
    Optimized for large datasets (7M+ rows) with chunked processing and efficient SQL.
    
    Parameters:
    -----------
    file_path : str or Path
        Path to .xlsx or .csv file
    table_name : str
        Name for raw data table in DuckDB
    features_table_name : str
        Name for features table in DuckDB
    chunk_size : int
        Rows per chunk for memory management (default 100k)
    db_file : str
        DuckDB file path (':memory:' for in-memory)
    force_recreate : bool
        Whether to recreate existing tables
        
    Returns:
    --------
    duckdb.DuckDBPyConnection
        Connected DuckDB instance with features table
    """
    
    # Initialize DuckDB connection
    conn = duckdb.connect(db_file)
    
    # Configure DuckDB for large datasets
    conn.execute("SET memory_limit='32GB'")
    conn.execute("SET threads=16")
    
    try:
        # Step 1: Load and prepare raw data
        logger.info(f"Loading data from {file_path}")
        _load_raw_data(conn, file_path, table_name, chunk_size, force_recreate)
        
        # Step 2: Create feature engineering tables
        logger.info("Creating intermediate feature tables")
        _create_intermediate_tables(conn, table_name)
        
        # Step 3: Generate all customer features
        logger.info("Generating customer clustering features")
        _create_customer_features(conn, features_table_name, force_recreate)
        
        # Step 4: Validate and optimize
        logger.info("Validating feature creation")
        _validate_features(conn, features_table_name)
        
        return conn
        
    except Exception as e:
        logger.error(f"Error in feature creation: {str(e)}")
        conn.close()
        raise

In [38]:
def _load_raw_data(conn, file_path, table_name, chunk_size, force_recreate):
    """Load raw data into DuckDB with chunked processing for large files."""
    
    file_path = Path(file_path)
    
    # Check if table exists
    table_exists = conn.execute(f"""
        SELECT COUNT(*) FROM information_schema.tables 
        WHERE table_name = '{table_name}'
    """).fetchone()[0] > 0
    
    if table_exists and not force_recreate:
        logger.info(f"Table {table_name} already exists, skipping data load")
        return
    
    # Drop table if force recreate
    if table_exists and force_recreate:
        conn.execute(f"DROP TABLE IF EXISTS {table_name}")
    
    # Load data based on file type
    if file_path.suffix.lower() == '.xlsx':
        _load_excel_chunked(conn, file_path, table_name, chunk_size)
    elif file_path.suffix.lower() == '.csv':
        _load_csv_direct(conn, file_path, table_name)
    else:
        raise ValueError(f"Unsupported file type: {file_path.suffix}")
    
    # Create indexes for performance
    logger.info("Creating indexes on raw data table")
    conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{table_name}_customer ON {table_name}(CUSTOMER_EMAILID)")
    conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{table_name}_order ON {table_name}(SALES_ORDER_NO)")
    conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{table_name}_date ON {table_name}(ORDER_DATE)")
    conn.execute(f"CREATE INDEX IF NOT EXISTS idx_{table_name}_return ON {table_name}(UNITS_RETURNED_FLAG)")


def _load_excel_chunked(conn, file_path, table_name, chunk_size):
    """Load Excel file in chunks to manage memory."""
    
    # Read Excel file in chunks
    logger.info(f"Reading Excel file in chunks of {chunk_size:,} rows")
    
    # First, get total rows for progress tracking
    df_sample = pd.read_excel(file_path, nrows=1)
    total_rows = None  # Excel doesn't easily give total rows without reading all
    
    chunk_num = 0
    table_created = False
    
    # Process in chunks
    for chunk in pd.read_excel(file_path, chunksize=chunk_size):
        chunk_num += 1
        logger.info(f"Processing chunk {chunk_num} ({len(chunk):,} rows)")
        
        # Clean and prepare chunk
        chunk = _clean_raw_data_chunk(chunk)
        
        # Create table on first chunk
        if not table_created:
            conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM chunk")
            table_created = True
        else:
            conn.execute(f"INSERT INTO {table_name} SELECT * FROM chunk")
        
        # Clean up memory
        del chunk
        gc.collect()


def _load_csv_direct(conn, file_path, table_name):
    """Load CSV file directly using DuckDB's efficient CSV reader."""
    
    logger.info("Loading CSV using DuckDB's native CSV reader")
    
    # Use DuckDB's efficient CSV reading
    conn.execute(f"""
        CREATE TABLE {table_name} AS 
        SELECT 
            CUSTOMER_EMAILID,
            SALES_ORDER_NO,
            Q_GMM_ID,
            Q_CLS_ID AS CATEGORY_ID,
            SKU,
            Q_SKU_DESC AS PRODUCT_NAME,
            SALES_QTY,
            UNITS_RETURNED_FLAG,
            RETURN_NO,
            RETURN_QTY,
            CAST(ORDER_DATE AS TIMESTAMP) AS ORDER_DATE,
            CASE 
                WHEN RETURN_DATE = '-' OR RETURN_DATE IS NULL THEN NULL
                ELSE CAST(RETURN_DATE AS TIMESTAMP)
            END AS RETURN_DATE
        FROM read_csv_auto('{file_path}')
    """)


def _clean_raw_data_chunk(df):
    """Clean and standardize a data chunk."""
    
    # Rename columns to standard names
    df = df.rename(columns={
        'Q_CLS_ID': 'CATEGORY_ID',
        'Q_SKU_DESC': 'PRODUCT_NAME'
    })
    
    # Clean return dates (convert '-' to None)
    df['RETURN_DATE'] = df['RETURN_DATE'].replace('-', None)
    
    # Ensure date columns are datetime
    df['ORDER_DATE'] = pd.to_datetime(df['ORDER_DATE'])
    df['RETURN_DATE'] = pd.to_datetime(df['RETURN_DATE'], errors='coerce')
    
    # Clean numeric columns
    df['SALES_QTY'] = pd.to_numeric(df['SALES_QTY'], errors='coerce').fillna(0)
    df['RETURN_QTY'] = pd.to_numeric(df['RETURN_QTY'], errors='coerce').fillna(0)
    
    # Standardize return flag
    df['UNITS_RETURNED_FLAG'] = df['UNITS_RETURNED_FLAG'].str.upper().eq('YES')
    
    return df

In [39]:
def _create_intermediate_tables(conn, table_name):
    """Create intermediate tables for efficient feature calculation."""
    
    # Get reference date (newest order date)
    reference_date = conn.execute(f"SELECT MAX(ORDER_DATE) FROM {table_name}").fetchone()[0]
    logger.info(f"Using reference date: {reference_date}")
    
    # Create customer order summary
    logger.info("Creating customer order summary")
    conn.execute(f"""
        CREATE OR REPLACE TABLE customer_order_summary AS
        SELECT 
            CUSTOMER_EMAILID,
            SALES_ORDER_NO,
            ORDER_DATE,
            COUNT(*) as items_in_order,
            SUM(SALES_QTY) as total_qty_ordered,
            COUNT(DISTINCT SKU) as unique_skus_in_order,
            COUNT(DISTINCT CATEGORY_ID) as unique_categories_in_order,
            SUM(CASE WHEN UNITS_RETURNED_FLAG THEN 1 ELSE 0 END) as items_returned_in_order,
            SUM(CASE WHEN UNITS_RETURNED_FLAG THEN RETURN_QTY ELSE 0 END) as qty_returned_in_order
            -- TODO: Add order value calculations when price data available
            -- SUM(SALES_QTY * UNIT_PRICE) as order_value,
            -- SUM(CASE WHEN UNITS_RETURNED_FLAG THEN RETURN_QTY * UNIT_PRICE ELSE 0 END) as return_value
        FROM {table_name}
        GROUP BY CUSTOMER_EMAILID, SALES_ORDER_NO, ORDER_DATE
    """)
    
    # Create customer item-level summary
    logger.info("Creating customer item summary")
    conn.execute(f"""
        CREATE OR REPLACE TABLE customer_item_summary AS
        SELECT 
            CUSTOMER_EMAILID,
            SKU,
            CATEGORY_ID,
            COUNT(*) as purchase_frequency,
            SUM(SALES_QTY) as total_qty_purchased,
            SUM(CASE WHEN UNITS_RETURNED_FLAG THEN RETURN_QTY ELSE 0 END) as total_qty_returned,
            COUNT(CASE WHEN UNITS_RETURNED_FLAG THEN 1 END) as return_frequency,
            MIN(ORDER_DATE) as first_purchase_date,
            MAX(ORDER_DATE) as last_purchase_date,
            AVG(CASE WHEN UNITS_RETURNED_FLAG AND RETURN_DATE IS NOT NULL 
                     THEN EXTRACT(DAY FROM (RETURN_DATE - ORDER_DATE)) END) as avg_days_to_return
        FROM {table_name}
        GROUP BY CUSTOMER_EMAILID, SKU, CATEGORY_ID
    """)
    
    # Create return timing analysis
    logger.info("Creating return timing analysis")
    conn.execute(f"""
        CREATE OR REPLACE TABLE return_timing_analysis AS
        SELECT 
            CUSTOMER_EMAILID,
            SALES_ORDER_NO,
            SKU,
            ORDER_DATE,
            RETURN_DATE,
            EXTRACT(DAY FROM (RETURN_DATE - ORDER_DATE)) as days_to_return,
            EXTRACT(MONTH FROM ORDER_DATE) as order_month,
            EXTRACT(MONTH FROM RETURN_DATE) as return_month
        FROM {table_name}
        WHERE UNITS_RETURNED_FLAG AND RETURN_DATE IS NOT NULL
    """)
    
    # Store reference date for feature calculations
    conn.execute(f"""
        CREATE OR REPLACE TABLE reference_metadata AS
        SELECT 
            '{reference_date}'::TIMESTAMP as reference_date,
            '{reference_date}'::TIMESTAMP - INTERVAL '90 days' as recent_cutoff_date
    """)

In [40]:
def _create_customer_features(conn, features_table_name, force_recreate):
    """Create comprehensive customer clustering features using SQL."""
    
    # Check if features table exists
    table_exists = conn.execute(f"""
        SELECT COUNT(*) FROM information_schema.tables 
        WHERE table_name = '{features_table_name}'
    """).fetchone()[0] > 0
    
    if table_exists and not force_recreate:
        logger.info(f"Features table {features_table_name} already exists, skipping creation")
        return
    
    if table_exists and force_recreate:
        conn.execute(f"DROP TABLE IF EXISTS {features_table_name}")
    
    logger.info("Creating comprehensive customer features")
    
    # Create the main features table with all categories
    conn.execute(f"""
        CREATE TABLE {features_table_name} AS
        WITH customer_base AS (
            SELECT DISTINCT CUSTOMER_EMAILID
            FROM customer_order_summary
        ),
        
        -- 📊 BASIC VOLUME METRICS
        basic_metrics AS (
            SELECT 
                cb.CUSTOMER_EMAILID,
                
                -- Core volume metrics
                COALESCE(COUNT(DISTINCT cos.SALES_ORDER_NO), 0) as SALES_ORDER_NO_nunique,
                COALESCE(COUNT(DISTINCT cis.SKU), 0) as SKU_nunique,
                COALESCE(SUM(cis.return_frequency), 0) as ITEMS_RETURNED_COUNT,
                COALESCE(AVG(cis.total_qty_purchased), 0) as SALES_QTY_mean,
                COALESCE(AVG(cos.items_in_order), 0) as AVG_ORDER_SIZE
                
            FROM customer_base cb
            LEFT JOIN customer_order_summary cos ON cb.CUSTOMER_EMAILID = cos.CUSTOMER_EMAILID
            LEFT JOIN customer_item_summary cis ON cb.CUSTOMER_EMAILID = cis.CUSTOMER_EMAILID
            GROUP BY cb.CUSTOMER_EMAILID
        ),
        
        -- 🔄 RETURN BEHAVIOR PATTERNS  
        return_behavior AS (
            SELECT 
                cb.CUSTOMER_EMAILID,
                
                -- Return rates and ratios
                COALESCE(
                    SUM(cis.return_frequency)::FLOAT / NULLIF(COUNT(cis.SKU), 0), 0
                ) as RETURN_RATE,
                
                COALESCE(
                    SUM(cis.total_qty_returned)::FLOAT / NULLIF(SUM(cis.total_qty_purchased), 0), 0
                ) as RETURN_RATIO,
                
                COALESCE(COUNT(DISTINCT CASE WHEN cis.return_frequency > 0 THEN cis.SKU END), 0) as RETURN_PRODUCT_VARIETY,
                
                COALESCE(
                    SUM(cos.items_returned_in_order)::FLOAT / NULLIF(COUNT(DISTINCT cos.SALES_ORDER_NO), 0), 0
                ) as AVG_RETURNS_PER_ORDER,
                
                COALESCE(
                    SUM(CASE WHEN cos.items_returned_in_order > 0 THEN 1 ELSE 0 END)::FLOAT / 
                    NULLIF(COUNT(DISTINCT cos.SALES_ORDER_NO), 0), 0
                ) as RETURN_FREQUENCY_RATIO,
                
                COALESCE(
                    AVG(CASE WHEN cis.total_qty_purchased > 0 
                        THEN cis.total_qty_returned::FLOAT / cis.total_qty_purchased END), 0
                ) as RETURN_INTENSITY,
                
                -- Consecutive returns pattern
                COALESCE(_calculate_consecutive_returns(cb.CUSTOMER_EMAILID), 0) as CONSECUTIVE_RETURNS,
                COALESCE(_calculate_avg_consecutive_returns(cb.CUSTOMER_EMAILID), 0) as AVG_CONSECUTIVE_RETURNS
                
            FROM customer_base cb
            LEFT JOIN customer_order_summary cos ON cb.CUSTOMER_EMAILID = cos.CUSTOMER_EMAILID
            LEFT JOIN customer_item_summary cis ON cb.CUSTOMER_EMAILID = cis.CUSTOMER_EMAILID
            GROUP BY cb.CUSTOMER_EMAILID
        ),
        
        -- ⏰ TEMPORAL & TIMING PATTERNS
        temporal_patterns AS (
            SELECT 
                cb.CUSTOMER_EMAILID,
                
                COALESCE(
                    EXTRACT(DAY FROM (MAX(cos.ORDER_DATE) - MIN(cos.ORDER_DATE))), 0
                ) as CUSTOMER_LIFETIME_DAYS,
                
                COALESCE(AVG(rta.days_to_return), 0) as AVG_DAYS_TO_RETURN,
                
                COALESCE(STDDEV(rta.days_to_return), 0) as RETURN_TIMING_SPREAD,
                
                CASE 
                    WHEN EXTRACT(DAY FROM (MAX(cos.ORDER_DATE) - MIN(cos.ORDER_DATE))) <= 90 THEN 'New'
                    WHEN EXTRACT(DAY FROM (MAX(cos.ORDER_DATE) - MIN(cos.ORDER_DATE))) <= 180 THEN 'Growing' 
                    WHEN EXTRACT(DAY FROM (MAX(cos.ORDER_DATE) - MIN(cos.ORDER_DATE))) <= 365 THEN 'Mature'
                    ELSE 'Established'
                END as CUSTOMER_TENURE_STAGE
                
            FROM customer_base cb
            LEFT JOIN customer_order_summary cos ON cb.CUSTOMER_EMAILID = cos.CUSTOMER_EMAILID
            LEFT JOIN return_timing_analysis rta ON cb.CUSTOMER_EMAILID = rta.CUSTOMER_EMAILID
            GROUP BY cb.CUSTOMER_EMAILID
        ),
        
        -- 📈 TREND & RECENCY ANALYSIS
        trend_recency AS (
            SELECT 
                cb.CUSTOMER_EMAILID,
                
                COALESCE(
                    COUNT(DISTINCT CASE WHEN cos.ORDER_DATE >= rm.recent_cutoff_date 
                                       THEN cos.SALES_ORDER_NO END), 0
                ) as RECENT_ORDERS,
                
                COALESCE(
                    SUM(CASE WHEN cos.ORDER_DATE >= rm.recent_cutoff_date 
                             THEN cos.items_returned_in_order ELSE 0 END), 0
                ) as RECENT_RETURNS,
                
                -- Recent vs average ratio (trend indicator)
                CASE 
                    WHEN COUNT(DISTINCT cos.SALES_ORDER_NO) > 0 AND 
                         COUNT(DISTINCT CASE WHEN cos.ORDER_DATE >= rm.recent_cutoff_date THEN cos.SALES_ORDER_NO END) > 0
                    THEN (
                        SUM(CASE WHEN cos.ORDER_DATE >= rm.recent_cutoff_date THEN cos.items_returned_in_order ELSE 0 END)::FLOAT /
                        NULLIF(COUNT(DISTINCT CASE WHEN cos.ORDER_DATE >= rm.recent_cutoff_date THEN cos.SALES_ORDER_NO END), 0)
                    ) / NULLIF(
                        SUM(cos.items_returned_in_order)::FLOAT / COUNT(DISTINCT cos.SALES_ORDER_NO), 0
                    )
                    ELSE 1.0
                END as RECENT_VS_AVG_RATIO,
                
                -- TODO: Add trend calculations for order and return frequency
                1.0 as ORDER_FREQUENCY_TREND,  -- Placeholder
                1.0 as RETURN_FREQUENCY_TREND, -- Placeholder
                
                -- Behavior stability (consistency of recent vs historical)
                CASE 
                    WHEN COUNT(DISTINCT cos.SALES_ORDER_NO) >= 3 
                    THEN 1.0 - ABS((
                        SUM(CASE WHEN cos.ORDER_DATE >= rm.recent_cutoff_date THEN cos.items_returned_in_order ELSE 0 END)::FLOAT /
                        NULLIF(COUNT(DISTINCT CASE WHEN cos.ORDER_DATE >= rm.recent_cutoff_date THEN cos.SALES_ORDER_NO END), 0)
                    ) - (
                        SUM(cos.items_returned_in_order)::FLOAT / COUNT(DISTINCT cos.SALES_ORDER_NO)
                    )) / NULLIF(
                        SUM(cos.items_returned_in_order)::FLOAT / COUNT(DISTINCT cos.SALES_ORDER_NO), 1
                    )
                    ELSE 0.5
                END as BEHAVIOR_STABILITY_SCORE
                
            FROM customer_base cb
            CROSS JOIN reference_metadata rm
            LEFT JOIN customer_order_summary cos ON cb.CUSTOMER_EMAILID = cos.CUSTOMER_EMAILID
            GROUP BY cb.CUSTOMER_EMAILID, rm.recent_cutoff_date
        )
        
        -- Combine all feature categories
        SELECT 
            bm.CUSTOMER_EMAILID,
            
            -- Basic Volume Metrics
            bm.SALES_ORDER_NO_nunique,
            bm.SKU_nunique,
            bm.ITEMS_RETURNED_COUNT,
            bm.SALES_QTY_mean,
            bm.AVG_ORDER_SIZE,
            
            -- Return Behavior Patterns
            rb.RETURN_RATE,
            rb.RETURN_RATIO,
            rb.RETURN_PRODUCT_VARIETY,
            rb.AVG_RETURNS_PER_ORDER,
            rb.RETURN_FREQUENCY_RATIO,
            rb.RETURN_INTENSITY,
            rb.CONSECUTIVE_RETURNS,
            rb.AVG_CONSECUTIVE_RETURNS,
            
            -- Temporal & Timing Patterns
            tp.CUSTOMER_LIFETIME_DAYS,
            tp.AVG_DAYS_TO_RETURN,
            tp.RETURN_TIMING_SPREAD,
            tp.CUSTOMER_TENURE_STAGE,
            
            -- Trend & Recency Analysis
            tr.RECENT_ORDERS,
            tr.RECENT_RETURNS,
            tr.RECENT_VS_AVG_RATIO,
            tr.ORDER_FREQUENCY_TREND,
            tr.RETURN_FREQUENCY_TREND,
            tr.BEHAVIOR_STABILITY_SCORE
            
            -- TODO: Add when value data available
            -- 💰 MONETARY VALUE PATTERNS
            -- mv.AVG_ORDER_VALUE,
            -- mv.AVG_RETURN_VALUE,
            -- mv.HIGH_VALUE_RETURN_AFFINITY,
            
            -- 🏷️ PRODUCT & CATEGORY INTELLIGENCE  
            -- pc.PRODUCT_CATEGORY_LOYALTY,
            -- pc.CATEGORY_DIVERSITY_SCORE,
            -- pc.CATEGORY_LOYALTY_SCORE,
            -- pc.HIGH_RETURN_CATEGORY_AFFINITY,
            -- pc.HIGH_RISK_PRODUCT_AFFINITY,
            -- pc.HIGH_RISK_RETURN_AFFINITY,
            
            -- 🔗 ADJACENCY & REPEAT BEHAVIOR
            -- ab.SKU_ADJACENCY_ORDERS,
            -- ab.SKU_ADJACENCY_RETURNS,
            -- ab.SKU_ADJACENCY_TIMING,
            -- ab.SKU_ADJACENCY_RETURN_TIMING,
            
            -- 🌊 SEASONAL & TREND SUSCEPTIBILITY
            -- st.SEASONAL_SUSCEPTIBILITY_RETURNS,
            -- st.SEASONAL_SUSCEPTIBILITY_ORDERS,
            -- st.TREND_PRODUCT_CATEGORY_RETURN_RATE,
            -- st.TREND_SKU_RETURN_RATE,
            -- st.TREND_PRODUCT_CATEGORY_ORDER_RATE,
            -- st.TREND_SKU_ORDER_RATE
            
        FROM basic_metrics bm
        LEFT JOIN return_behavior rb ON bm.CUSTOMER_EMAILID = rb.CUSTOMER_EMAILID
        LEFT JOIN temporal_patterns tp ON bm.CUSTOMER_EMAILID = tp.CUSTOMER_EMAILID
        LEFT JOIN trend_recency tr ON bm.CUSTOMER_EMAILID = tr.CUSTOMER_EMAILID
    """)
    
    # Create index on customer email for performance
    conn.execute(f"CREATE INDEX idx_{features_table_name}_customer ON {features_table_name}(CUSTOMER_EMAILID)")
    
    logger.info(f"Created features table '{features_table_name}' with basic feature set")
    logger.info("Advanced features (monetary, category intelligence, adjacency, seasonal) are commented out")
    logger.info("Uncomment and implement these sections when additional data becomes available")

In [41]:
def _validate_features(conn, features_table_name):
    """Validate feature creation and provide summary statistics."""
    
    # Get basic table info
    row_count = conn.execute(f"SELECT COUNT(*) FROM {features_table_name}").fetchone()[0]
    logger.info(f"Features table created with {row_count:,} customers")
    
    # Get feature summary
    feature_summary = conn.execute(f"""
        SELECT 
            COUNT(*) as total_customers,
            AVG(SALES_ORDER_NO_nunique) as avg_orders_per_customer,
            AVG(SKU_nunique) as avg_skus_per_customer,
            AVG(RETURN_RATE) as avg_return_rate,
            COUNT(CASE WHEN ITEMS_RETURNED_COUNT > 0 THEN 1 END) as customers_with_returns,
            COUNT(CASE WHEN RECENT_ORDERS > 0 THEN 1 END) as recent_active_customers
        FROM {features_table_name}
    """).fetchone()
    
    logger.info(f"Feature Summary:")
    logger.info(f"  Total customers: {feature_summary[0]:,}")
    logger.info(f"  Avg orders per customer: {feature_summary[1]:.2f}")
    logger.info(f"  Avg SKUs per customer: {feature_summary[2]:.2f}")
    logger.info(f"  Avg return rate: {feature_summary[3]:.2%}")
    logger.info(f"  Customers with returns: {feature_summary[4]:,} ({feature_summary[4]/feature_summary[0]:.1%})")
    logger.info(f"  Recently active customers: {feature_summary[5]:,} ({feature_summary[5]/feature_summary[0]:.1%})")
    
    # Check for any null values in key features
    null_check = conn.execute(f"""
        SELECT 
            SUM(CASE WHEN SALES_ORDER_NO_nunique IS NULL THEN 1 ELSE 0 END) as null_orders,
            SUM(CASE WHEN RETURN_RATE IS NULL THEN 1 ELSE 0 END) as null_return_rate,
            SUM(CASE WHEN CUSTOMER_LIFETIME_DAYS IS NULL THEN 1 ELSE 0 END) as null_lifetime
        FROM {features_table_name}
    """).fetchone()
    
    if any(null_check):
        logger.warning(f"Found null values in key features: {null_check}")
    else:
        logger.info("No null values found in key features")

In [42]:
# Helper functions for complex feature calculations
# These would need to be implemented as DuckDB functions or separate queries

def _calculate_consecutive_returns(customer_id):
    """Calculate maximum consecutive orders with returns for a customer."""
    # TODO: Implement as window function query
    return 0

def _calculate_avg_consecutive_returns(customer_id):
    """Calculate average consecutive return streaks for a customer."""
    # TODO: Implement as window function query
    return 0

In [43]:
# # Example usage
# if __name__ == "__main__":
    
#     # Example: Create features from Excel file
#     file_path = "RETRO_SAMPLE Copy.xlsx"  # Update with your file path
    
#     # Create features (adjust chunk_size based on your system)
#     conn = create_customer_clustering_features(
#         file_path=file_path,
#         table_name='customer_transactions',
#         features_table_name='customer_clustering_features',
#         chunk_size=50000,  # Adjust based on available RAM
#         db_file='customer_features.db',  # Use file-based DB for large datasets
#         force_recreate=False
#     )
    
#     # View sample of features
#     sample_features = conn.execute("""
#         SELECT * FROM customer_clustering_features 
#         ORDER BY SALES_ORDER_NO_nunique DESC 
#         LIMIT 10
#     """).df()
    
#     print("\nSample Customer Features:")
#     print(sample_features)
    
#     # Export features for analysis
#     # features_df = conn.execute("SELECT * FROM customer_clustering_features").df()
#     # features_df.to_csv('customer_clustering_features.csv', index=False)
    
#     conn.close()