In [1]:
# L&T Finance Pearl Challenge - Farmer Income Prediction

# ==============================================================================
# 01_data_loading_conversion.ipynb
# Purpose: Load raw data, verify, convert, and save for further analysis/modeling
# ==============================================================================

import pandas as pd
import numpy as np
import os
import requests
import json
import pickle
import time
import warnings
from pathlib import Path
from urllib.parse import urlparse
from typing import Dict, List, Tuple, Any, Union
import re

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore')

# Configuration
DATA_URL = "https://www.ltfraise.com/Pdf/Pearl%20Challenge%20data%20with%20dictionary_For_Share_v4.xlsx"

BASE_DIR = Path('../data')
RAW_DATA_DIR = BASE_DIR / "raw"
EXTERNAL_DATA_DIR = BASE_DIR / "external" / "original_data"

print("\n Directory structure created successfully!")
print(f" Base directory: {BASE_DIR.absolute()}")
print(f"External directory: {EXTERNAL_DATA_DIR.absolute()}")


 Directory structure created successfully!
 Base directory: c:\LnT Project\lt_finance_farmer_prediction\notebooks\..\data
External directory: c:\LnT Project\lt_finance_farmer_prediction\notebooks\..\data\external\original_data


In [2]:
# =============================================================================
# FILE DOWNLOAD AND VERIFICATION WITH ERROR HANDLING
# =============================================================================
# Purpose: Download Excel file with retry logic and verify integrity
# Handles: Network errors, file corruption, path issues
# Output: Verified Excel file ready for loading
# =============================================================================

def download_file_with_retry(url: str, filepath: Path, max_retries: int = 3) -> bool:
    """
    Download file with retry logic and error handling
    
    Args:
        url: Source URL for download
        filepath: Destination file path
        max_retries: Maximum number of retry attempts
    
    Returns:
        bool: True if download successful, False otherwise
    """
    for attempt in range(max_retries):
        try:
            print(f" Download attempt {attempt + 1}/{max_retries}...")
            
            # Configure request with headers to avoid blocking
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
            
            response = requests.get(url, headers=headers, timeout=30, stream=True)
            response.raise_for_status()
            
            # Write file in chunks to handle large files
            with open(filepath, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
            
            # Verify file size
            file_size = filepath.stat().st_size
            if file_size < 1000:  # File too small, likely error page
                print(f" Downloaded file too small ({file_size} bytes), retrying...")
                continue
                
            print(f" Download successful! File size: {file_size:,} bytes")
            return True
            
        except requests.exceptions.RequestException as e:
            print(f" Download attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                print(f" Waiting 5 seconds before retry...")
                time.sleep(5)
        except Exception as e:
            print(f" Unexpected error during download: {e}")
            break
    
    return False

def verify_excel_file(filepath: Path) -> bool:
    """
    Verify Excel file integrity and structure
    
    Args:
        filepath: Path to Excel file
        
    Returns:
        bool: True if file is valid Excel with expected sheets
    """
    try:
        # Try to read Excel file metadata
        excel_file = pd.ExcelFile(filepath)
        sheet_names = excel_file.sheet_names
        
        print(f" Found sheets: {sheet_names}")
        
        # Check for expected sheets
        expected_sheets = ['TrainData', 'TestData', 'Dictionary']
        missing_sheets = [sheet for sheet in expected_sheets if sheet not in sheet_names]
        
        if missing_sheets:
            print(f" Missing expected sheets: {missing_sheets}")
            return False
        
        # Quick check - try to read first few rows of each sheet
        for sheet in expected_sheets:
            sample_df = pd.read_excel(filepath, sheet_name=sheet, nrows=5)
            print(f" Sheet '{sheet}': {sample_df.shape[1]} columns, sample read successful")
        
        return True
        
    except Exception as e:
        print(f" Excel file verification failed: {e}")
        return False

# Main download logic
excel_filepath = EXTERNAL_DATA_DIR / "Pearl Challenge data with dictionary_For_Share_v4.xlsx"

print(" Checking for existing Excel file...")
if excel_filepath.exists():
    print(f" File already exists: {excel_filepath}")
    print(f" File size: {excel_filepath.stat().st_size:,} bytes")
    
    # Verify existing file
    if verify_excel_file(excel_filepath):
        print(" Existing file verification passed!")
        file_ready = True
    else:
        print(" Existing file verification failed, will re-download...")
        file_ready = False
else:
    print(" File not found, will download...")
    file_ready = False

# Download if needed
if not file_ready:
    print(f"\n Starting download from: {DATA_URL}")
    
    if download_file_with_retry(DATA_URL, excel_filepath):
        if verify_excel_file(excel_filepath):
            print(" Download and verification successful!")
            file_ready = True
        else:
            print(" Downloaded file failed verification!")
            file_ready = False
    else:
        print(" Download failed after all retries!")
        file_ready = False

if not file_ready:
    raise FileNotFoundError("Unable to obtain valid Excel file. Please check URL and network connection.")

print(f"\n Excel file ready for processing: {excel_filepath}")
print(f" Final file size: {excel_filepath.stat().st_size:,} bytes")

 Checking for existing Excel file...
 File already exists: ..\data\external\original_data\Pearl Challenge data with dictionary_For_Share_v4.xlsx
 File size: 46,097,184 bytes
 Found sheets: ['TrainData', 'TestData', 'Dictionary']
 Sheet 'TrainData': 105 columns, sample read successful
 Sheet 'TestData': 104 columns, sample read successful
 Sheet 'Dictionary': 3 columns, sample read successful
 Existing file verification passed!

 Excel file ready for processing: ..\data\external\original_data\Pearl Challenge data with dictionary_For_Share_v4.xlsx
 Final file size: 46,097,184 bytes


In [3]:
# =============================================================================
# EXCEL DATA LOADING WITH TYPE MANAGEMENT
# =============================================================================
# Purpose: Load 3 Excel sheets with proper dtype handling and type conversion
# Handles: Mixed data types, object columns, categorical variables, encoding
# Output: Clean DataFrames with consistent data types
# =============================================================================

def convert_mixed_dtype_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Convert mixed data type columns to appropriate types for processing
    
    Args:
        df: DataFrame with potentially mixed dtypes
        
    Returns:
        pd.DataFrame: DataFrame with cleaned data types
    """
    df_converted = df.copy()
    
    for col in df_converted.columns:
        # Handle object columns that might be numeric
        if df_converted[col].dtype == 'object':
            # Try to convert to numeric, keep as string if fails
            try:
                # First, handle any string representations of numbers
                numeric_series = pd.to_numeric(df_converted[col], errors='coerce')
                
                # If conversion successful for most values, use it
                if numeric_series.notna().sum() > len(df_converted) * 0.8:
                    df_converted[col] = numeric_series
                    print(f"   Converted '{col}' from object to numeric")
                else:
                    # Keep as string but ensure consistent encoding
                    df_converted[col] = df_converted[col].astype(str).replace('nan', np.nan)
                    print(f"   Kept '{col}' as string (object)")
            except:
                # Keep as string if conversion fails
                df_converted[col] = df_converted[col].astype(str).replace('nan', np.nan)
                print(f"   Kept '{col}' as string (conversion failed)")
    
    return df_converted

def ensure_json_serializable_types(df: pd.DataFrame) -> pd.DataFrame:
    """
    Ensure all DataFrame values are JSON serializable
    
    Args:
        df: DataFrame to convert
        
    Returns:
        pd.DataFrame: DataFrame with JSON-compatible types
    """
    df_json = df.copy()
    
    for col in df_json.columns:
        if df_json[col].dtype == 'int64':
            # Convert numpy int64 to int32 for JSON compatibility and memory efficiency
            df_json[col] = df_json[col].astype('Int32')  # Nullable integer
        elif df_json[col].dtype == 'float64':
            # Convert to float32 for memory efficiency
            df_json[col] = df_json[col].astype('float32')
        elif pd.api.types.is_categorical_dtype(df_json[col]):
            # Convert categorical to string
            df_json[col] = df_json[col].astype(str)
            print(f"   Converted categorical '{col}' to string")
    
    return df_json

def load_excel_sheet_with_types(filepath: Path, sheet_name: str) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Load Excel sheet with comprehensive type handling
    
    Args:
        filepath: Path to Excel file
        sheet_name: Name of sheet to load
        
    Returns:
        Tuple of (DataFrame, metadata dict)
    """
    print(f"\n Loading sheet: {sheet_name}")
    
    try:
        # Load with minimal dtype inference first
        df = pd.read_excel(filepath, sheet_name=sheet_name, dtype=str)
        original_shape = df.shape
        print(f"   Initial shape: {original_shape}")
        print(f"   Columns: {len(df.columns)}")
        
        # Create metadata
        metadata = {
            'sheet_name': sheet_name,
            'shape': df.shape,
            'columns': len(df.columns),
            'rows': len(df),
            'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
            'missing_values': df.isnull().sum().to_dict(),
            'memory_usage': df.memory_usage(deep=True).sum()
        }
        
        # Convert numpy types in metadata for JSON serialization
        metadata['missing_values'] = {
            k: int(v) if isinstance(v, (np.integer, np.int64)) else v 
            for k, v in metadata['missing_values'].items()
        }
        metadata['memory_usage'] = int(metadata['memory_usage'])
        
        print(f"   Processed shape: {df.shape}")
        print(f"   Memory usage: {metadata['memory_usage']:,} bytes")
        print(f"   Data types: {len(set(df.dtypes.astype(str)))} unique types")
        
        return df, metadata
        
    except Exception as e:
        print(f"   Error loading sheet '{sheet_name}': {e}")
        raise

# Load all three sheets
print(" Loading Excel sheets with type management...")

# Load TrainData
train_df, train_metadata = load_excel_sheet_with_types(excel_filepath, 'TrainData')

# Load TestData  
test_df, test_metadata = load_excel_sheet_with_types(excel_filepath, 'TestData')

# Create comprehensive loading report
loading_report = {
    'timestamp': pd.Timestamp.now().isoformat(),
    'source_file': str(excel_filepath),
    'sheets_loaded': {
        'train': train_metadata,
        'test': test_metadata,
    },
    'total_memory_usage': train_metadata['memory_usage'] + test_metadata['memory_usage']
}

print(f"\n LOADING SUMMARY:")
print(f"   Train data: {train_df.shape[0]:,} rows × {train_df.shape[1]} columns")
print(f"   Test data: {test_df.shape[0]:,} rows × {test_df.shape[1]} columns")  
print(f"   Total memory: {loading_report['total_memory_usage']:,} bytes")

# Quick data type validation
print(f"\n DATA TYPE VALIDATION:")
train_dtypes = set(train_df.dtypes.astype(str))
test_dtypes = set(test_df.dtypes.astype(str))
print(f"   Train data types: {train_dtypes}")
print(f"   Test data types: {test_dtypes}")
print(f"   Common types: {train_dtypes.intersection(test_dtypes)}")

if train_dtypes != test_dtypes:
    print(f"   Type differences detected - will address in preprocessing")
else:
    print(f"   Data types consistent between train and test")

 Loading Excel sheets with type management...

 Loading sheet: TrainData
   Initial shape: (53306, 105)
   Columns: 105
   Processed shape: (53306, 105)
   Memory usage: 339,197,364 bytes
   Data types: 1 unique types

 Loading sheet: TestData
   Initial shape: (10000, 104)
   Columns: 104
   Processed shape: (10000, 104)
   Memory usage: 63,102,081 bytes
   Data types: 1 unique types

 LOADING SUMMARY:
   Train data: 53,306 rows × 105 columns
   Test data: 10,000 rows × 104 columns
   Total memory: 402,299,445 bytes

 DATA TYPE VALIDATION:
   Train data types: {'object'}
   Test data types: {'object'}
   Common types: {'object'}
   Data types consistent between train and test


In [4]:
# =============================================================================
# FEATURE NAME SANITIZATION WITH TYPE SAFETY
# =============================================================================
# Purpose: Clean column names while preserving data integrity and uniqueness
# Handles: Special characters, duplicates, target variable, JSON serialization
# Output: Sanitized DataFrames and JSON-compatible feature mapping
# =============================================================================

def sanitize_column_name(col_name: str) -> str:
    """
    Sanitize individual column name following specific rules
    
    Args:
        col_name: Original column name
        
    Returns:
        str: Sanitized column name
    """
    if pd.isna(col_name) or col_name is None:
        return "unnamed_column"
    
    # Convert to string and handle special cases
    name = str(col_name).strip()
    
    # Handle target variable specifically
    if "Target_Variable" in name or "Total Income" in name or "TARGET VARIABLE" in name:
        return "target_income"
    
    # Convert to lowercase
    name = name.lower()
    
    # Replace special characters with underscores
    # Handle: /, (, ), &, spaces, dots, hyphens, etc.
    special_chars = r'[/\(\)&\s\.\-\+\*\%\$\#\@\!\?\[\]\{\}\|\\\:\;\,\<\>\=\^\~\`\'\"]'
    name = re.sub(special_chars, '_', name)
    
    # Remove multiple consecutive underscores
    name = re.sub(r'_+', '_', name)
    
    # Remove leading/trailing underscores
    name = name.strip('_')
    
    # Handle empty names
    if not name:
        name = "unnamed_column"
    
    # Ensure doesn't start with number
    if name and name[0].isdigit():
        name = f"col_{name}"
    
    # Truncate very long names
    # if len(name) > 50:
    #     name = name[:47] + "..."
    
    return name

def create_unique_column_names(columns: List[str]) -> Tuple[List[str], Dict[str, str]]:
    """
    Create unique column names and mapping dictionary
    
    Args:
        columns: List of original column names
        
    Returns:
        Tuple of (unique_names_list, mapping_dict)
    """
    sanitized_names = [sanitize_column_name(col) for col in columns]
    unique_names = []
    name_counts = {}
    mapping = {}
    
    for i, (original, sanitized) in enumerate(zip(columns, sanitized_names)):
        # Handle duplicates by adding suffix
        if sanitized in name_counts:
            name_counts[sanitized] += 1
            unique_name = f"{sanitized}_{name_counts[sanitized]}"
        else:
            name_counts[sanitized] = 0
            unique_name = sanitized
        
        unique_names.append(unique_name)
        mapping[str(original)] = unique_name  # Ensure key is string for JSON
    
    return unique_names, mapping

def apply_column_sanitization(df: pd.DataFrame, sheet_type: str) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Apply column name sanitization to DataFrame
    
    Args:
        df: DataFrame to sanitize
        sheet_type: Type identifier for the sheet
        
    Returns:
        Tuple of (sanitized_DataFrame, sanitization_metadata)
    """
    print(f"\n Sanitizing column names for {sheet_type} data...")
    
    original_columns = df.columns.tolist()
    unique_names, column_mapping = create_unique_column_names(original_columns)
    
    # Create new DataFrame with sanitized names
    df_sanitized = df.copy()
    df_sanitized.columns = unique_names
    
    # Create sanitization metadata
    sanitization_metadata = {
        'sheet_type': sheet_type,
        'original_column_count': len(original_columns),
        'sanitized_column_count': len(unique_names),
        'column_mapping': column_mapping,
        'duplicates_found': len(original_columns) - len(set(original_columns)),
        'target_variable_found': any('target_income' in name for name in unique_names),
        'longest_original_name': max(len(str(col)) for col in original_columns),
        'longest_sanitized_name': max(len(name) for name in unique_names)
    }
    
    print(f"   Original columns: {len(original_columns)}")
    print(f"   Sanitized columns: {len(unique_names)}")
    print(f"   Duplicates handled: {sanitization_metadata['duplicates_found']}")
    print(f"   Target variable found: {sanitization_metadata['target_variable_found']}")
    
    # Show sample mappings for verification
    print(f"   Sample mappings:")
    sample_mappings = list(column_mapping.items())[:5]
    # for orig, san in sample_mappings:
    #     if len(str(orig)) > 30:
    #         orig_display = str(orig)[:27] + "..."
    #     else:
    #         orig_display = str(orig)
    #     print(f"    '{orig_display}' → '{san}'")
    
    # return df_sanitized, sanitization_metadata

    for orig, san in sample_mappings:
        orig_display = str(orig)
        print(f"    '{orig_display}' → '{san}'")
    
    return df_sanitized, sanitization_metadata

def validate_column_consistency(train_cols: List[str], test_cols: List[str]) -> Dict[str, Any]:
    """
    Validate column consistency between train and test datasets
    
    Args:
        train_cols: List of training column names
        test_cols: List of test column names
        
    Returns:
        Dict: Validation results
    """
    train_set = set(train_cols)
    test_set = set(test_cols)
    
    validation_results = {
        'train_columns': len(train_cols),
        'test_columns': len(test_cols),
        'common_columns': len(train_set.intersection(test_set)),
        'train_only_columns': list(train_set - test_set),
        'test_only_columns': list(test_set - train_set),
        'columns_match': train_set == test_set,
        'target_in_train': 'target_income' in train_cols,
        'target_in_test': 'target_income' in test_cols
    }
    
    return validation_results

# Apply sanitization to all datasets
print(" Starting feature name sanitization process...")

# Sanitize training data
train_df_clean, train_sanitization = apply_column_sanitization(train_df, 'training')

# Sanitize test data
test_df_clean, test_sanitization = apply_column_sanitization(test_df, 'test')


# Validate column consistency
print(f"\n COLUMN CONSISTENCY VALIDATION:")
consistency_check = validate_column_consistency(
    train_df_clean.columns.tolist(),
    test_df_clean.columns.tolist()
)

print(f"   Train columns: {consistency_check['train_columns']}")
print(f"   Test columns: {consistency_check['test_columns']}")
print(f"   Common columns: {consistency_check['common_columns']}")
print(f"   Target in train: {consistency_check['target_in_train']}")
print(f"   Target in test: {consistency_check['target_in_test']}")

if not consistency_check['columns_match']:
    print(f"   Column mismatch detected:")
    if consistency_check['train_only_columns']:
        print(f"     Train-only: {consistency_check['train_only_columns'][:5]}")
    if consistency_check['test_only_columns']:
        print(f"     Test-only: {consistency_check['test_only_columns'][:5]}")
else:
    print(f"   Perfect column alignment!")

# Create comprehensive feature mapping for JSON serialization
comprehensive_mapping = {
    'timestamp': pd.Timestamp.now().isoformat(),
    'sanitization_summary': {
        'train': train_sanitization,
        'test': test_sanitization, 
    },
    'consistency_validation': consistency_check,
    'master_column_mapping': {
        'train': train_sanitization['column_mapping'],
        'test': test_sanitization['column_mapping'],
    }
}

print(f"\n Feature name sanitization completed successfully!")
print(f" Comprehensive mapping created with {len(comprehensive_mapping)} sections")

 Starting feature name sanitization process...

 Sanitizing column names for training data...
   Original columns: 105
   Sanitized columns: 105
   Duplicates handled: 0
   Target variable found: True
   Sample mappings:
    'FarmerID' → 'farmerid'
    'State' → 'state'
    'REGION' → 'region'
    'SEX' → 'sex'
    'CITY' → 'city'

 Sanitizing column names for test data...
   Original columns: 104
   Sanitized columns: 104
   Duplicates handled: 0
   Target variable found: False
   Sample mappings:
    'FarmerID' → 'farmerid'
    'State' → 'state'
    'REGION' → 'region'
    'SEX' → 'sex'
    'CITY' → 'city'

 COLUMN CONSISTENCY VALIDATION:
   Train columns: 105
   Test columns: 104
   Common columns: 104
   Target in train: True
   Target in test: False
   Column mismatch detected:
     Train-only: ['target_income']

 Feature name sanitization completed successfully!
 Comprehensive mapping created with 4 sections


In [5]:
# =============================================================================
# DATA VALIDATION WITH TYPE CHECKING AND QUALITY ASSESSMENT
# =============================================================================
# Purpose: Comprehensive data validation, type consistency, and quality assessment
# Handles: Data type issues, missing values, outliers, data quality metrics
# Output: Validation reports and data quality assessments with JSON compatibility
# =============================================================================

def analyze_column_data_types(df: pd.DataFrame, dataset_name: str) -> Dict[str, Any]:
    """
    Analyze data types and identify potential issues
    
    Args:
        df: DataFrame to analyze
        dataset_name: Name identifier for the dataset
        
    Returns:
        Dict: Comprehensive data type analysis
    """
    print(f"\n Analyzing data types for {dataset_name}...")
    
    dtype_analysis = {
        'dataset_name': dataset_name,
        'total_columns': len(df.columns),
        'data_types': {},
        'type_distribution': {},
        'problematic_columns': [],
        'mixed_type_columns': [],
        'high_cardinality_columns': [],
        'potential_categorical_columns': []
    }
    
    # Analyze each column
    for col in df.columns:
        col_info = {
            'dtype': str(df[col].dtype),
            'non_null_count': int(df[col].count()),
            'null_count': int(df[col].isnull().sum()),
            'null_percentage': float(df[col].isnull().sum() / len(df) * 100),
            'unique_values': int(df[col].nunique()),
            'cardinality_ratio': float(df[col].nunique() / len(df))
        }
        
        # Check for mixed types in object columns
        if df[col].dtype == 'object' and col_info['non_null_count'] > 0:
            sample_values = df[col].dropna().head(100)
            value_types = set(type(val).__name__ for val in sample_values)
            col_info['value_types'] = list(value_types)
            
            if len(value_types) > 1:
                dtype_analysis['mixed_type_columns'].append(col)
        
        # Identify high cardinality columns
        if col_info['cardinality_ratio'] > 0.95 and col_info['unique_values'] > 100:
            dtype_analysis['high_cardinality_columns'].append(col)
        
        # Identify potential categorical columns
        if (df[col].dtype == 'object' and 
            col_info['cardinality_ratio'] < 0.1 and 
            col_info['unique_values'] < 50):
            dtype_analysis['potential_categorical_columns'].append(col)
        
        # Flag problematic columns
        if col_info['null_percentage'] > 80:
            dtype_analysis['problematic_columns'].append({
                'column': col,
                'issue': 'high_missing_rate',
                'value': col_info['null_percentage']
            })
        
        dtype_analysis['data_types'][col] = col_info
    
    # Create type distribution summary
    type_counts = df.dtypes.value_counts()
    dtype_analysis['type_distribution'] = {
        str(dtype): int(count) for dtype, count in type_counts.items()
    }
    
    print(f"   Column analysis complete:")
    print(f"     Numeric columns: {sum(1 for col in df.columns if pd.api.types.is_numeric_dtype(df[col]))}")
    print(f"     Object columns: {sum(1 for col in df.columns if df[col].dtype == 'object')}")
    print(f"     Mixed type columns: {len(dtype_analysis['mixed_type_columns'])}")
    print(f"     Potential categorical: {len(dtype_analysis['potential_categorical_columns'])}")
    
    return dtype_analysis

def check_data_consistency(train_df: pd.DataFrame, test_df: pd.DataFrame) -> Dict[str, Any]:
    """
    Check data consistency between train and test datasets
    
    Args:
        train_df: Training DataFrame
        test_df: Test DataFrame
        
    Returns:
        Dict: Consistency check results
    """
    print(f"\n Checking train-test data consistency...")
    
    consistency_report = {
        'timestamp': pd.Timestamp.now().isoformat(),
        'shape_comparison': {
            'train_shape': list(train_df.shape),
            'test_shape': list(test_df.shape),
            'same_columns': train_df.shape[1] == test_df.shape[1]
        },
        'column_consistency': {},
        'dtype_consistency': {},
        'value_range_consistency': {},
        'categorical_consistency': {}
    }
    
    # Compare common columns
    common_cols = set(train_df.columns).intersection(set(test_df.columns))
    train_only = set(train_df.columns) - set(test_df.columns)
    test_only = set(test_df.columns) - set(train_df.columns)
    
    consistency_report['column_consistency'] = {
        'common_columns': len(common_cols),
        'train_only': list(train_only),
        'test_only': list(test_only),
        'perfect_match': len(train_only) == 0 and len(test_only) == 0
    }
    
    # Check data type consistency for common columns
    dtype_mismatches = []
    for col in common_cols:
        train_dtype = str(train_df[col].dtype)
        test_dtype = str(test_df[col].dtype)
        
        if train_dtype != test_dtype:
            dtype_mismatches.append({
                'column': col,
                'train_dtype': train_dtype,
                'test_dtype': test_dtype
            })
    
    consistency_report['dtype_consistency'] = {
        'matching_dtypes': len(common_cols) - len(dtype_mismatches),
        'mismatched_dtypes': len(dtype_mismatches),
        'mismatches': dtype_mismatches
    }
    
    # Check value ranges for numeric columns
    range_issues = []
    for col in common_cols:
        if pd.api.types.is_numeric_dtype(train_df[col]) and pd.api.types.is_numeric_dtype(test_df[col]):
            train_range = (float(train_df[col].min()), float(train_df[col].max()))
            test_range = (float(test_df[col].min()), float(test_df[col].max()))
            
            # Check if test range is within train range (important for model performance)
            if test_range[0] < train_range[0] or test_range[1] > train_range[1]:
                range_issues.append({
                    'column': col,
                    'train_range': train_range,
                    'test_range': test_range,
                    'test_outside_train_range': True
                })
    
    consistency_report['value_range_consistency'] = {
        'columns_checked': len([col for col in common_cols if pd.api.types.is_numeric_dtype(train_df[col])]),
        'range_issues': len(range_issues),
        'issues_detail': range_issues
    }
    
    print(f"   Consistency check results:")
    print(f"     Common columns: {len(common_cols)}")
    print(f"     Type mismatches: {len(dtype_mismatches)}")
    print(f"     Range issues: {len(range_issues)}")
    
    return consistency_report

def create_data_quality_report(df: pd.DataFrame, dataset_name: str) -> Dict[str, Any]:
    """
    Create comprehensive data quality report
    
    Args:
        df: DataFrame to assess
        dataset_name: Name of the dataset
        
    Returns:
        Dict: Data quality metrics
    """
    print(f"\n Creating data quality report for {dataset_name}...")
    
    # Basic statistics
    total_cells = df.shape[0] * df.shape[1]
    missing_cells = df.isnull().sum().sum()
    
    quality_report = {
        'dataset_name': dataset_name,
        'timestamp': pd.Timestamp.now().isoformat(),
        'basic_stats': {
            'rows': int(df.shape[0]),
            'columns': int(df.shape[1]),
            'total_cells': int(total_cells),
            'missing_cells': int(missing_cells),
            'missing_percentage': float(missing_cells / total_cells * 100),
            'memory_usage_mb': float(df.memory_usage(deep=True).sum() / 1024 / 1024)
        },
        'column_quality': {},
        'data_completeness': {},
        'potential_issues': []
    }
    
    # Per-column quality metrics
    for col in df.columns:
        col_quality = {
            'dtype': str(df[col].dtype),
            'missing_count': int(df[col].isnull().sum()),
            'missing_percentage': float(df[col].isnull().sum() / len(df) * 100),
            'unique_values': int(df[col].nunique()),
            'completeness_score': float((len(df) - df[col].isnull().sum()) / len(df))
        }
        
        # Add numeric-specific metrics
        if pd.api.types.is_numeric_dtype(df[col]):
            col_quality.update({
                'mean': float(df[col].mean()) if df[col].count() > 0 else None,
                'std': float(df[col].std()) if df[col].count() > 0 else None,
                'min': float(df[col].min()) if df[col].count() > 0 else None,
                'max': float(df[col].max()) if df[col].count() > 0 else None,
                'zeros_count': int((df[col] == 0).sum()),
                'negative_count': int((df[col] < 0).sum()) if df[col].count() > 0 else 0
            })
        
        quality_report['column_quality'][col] = col_quality
        
        # Flag potential issues
        if col_quality['missing_percentage'] > 50:
            quality_report['potential_issues'].append(f"High missing rate in '{col}': {col_quality['missing_percentage']:.1f}%")
        
        if col_quality['unique_values'] == 1:
            quality_report['potential_issues'].append(f"Constant column detected: '{col}'")
    
    # Data completeness summary
    missing_by_column = df.isnull().sum().sort_values(ascending=False)
    quality_report['data_completeness'] = {
        'columns_with_missing': int((missing_by_column > 0).sum()),
        'complete_columns': int((missing_by_column == 0).sum()),
        'worst_columns': {
            col: int(count) for col, count in missing_by_column.head(5).items()
        }
    }
    
    print(f"   Quality assessment complete:")
    print(f"     Overall completeness: {100 - quality_report['basic_stats']['missing_percentage']:.1f}%")
    print(f"     Potential issues: {len(quality_report['potential_issues'])}")
    print(f"     Memory usage: {quality_report['basic_stats']['memory_usage_mb']:.1f} MB")
    
    return quality_report

# Perform comprehensive data validation
print(" Starting comprehensive data validation and quality assessment...")

# Analyze data types for each dataset
train_dtype_analysis = analyze_column_data_types(train_df_clean, 'training')
test_dtype_analysis = analyze_column_data_types(test_df_clean, 'test')

# Check consistency between train and test
consistency_report = check_data_consistency(train_df_clean, test_df_clean)

# Create quality reports
train_quality_report = create_data_quality_report(train_df_clean, 'training')
test_quality_report = create_data_quality_report(test_df_clean, 'test')

# Create comprehensive validation summary
validation_summary = {
    'timestamp': pd.Timestamp.now().isoformat(),
    'validation_status': 'completed',
    'datasets_analyzed': ['training', 'test'],
    'dtype_analysis': {
        'train': train_dtype_analysis,
        'test': test_dtype_analysis
    },
    'consistency_check': consistency_report,
    'quality_reports': {
        'train': train_quality_report,
        'test': test_quality_report
    },
    'overall_assessment': {
        'data_ready_for_processing': True,
        'major_issues_found': len(train_quality_report['potential_issues']) + len(test_quality_report['potential_issues']),
        'consistency_score': consistency_report['dtype_consistency']['matching_dtypes'] / max(1, len(set(train_df_clean.columns).intersection(set(test_df_clean.columns)))),
        'recommendation': 'Proceed with preprocessing - address identified issues during feature engineering'
    }
}

print(f"\n VALIDATION SUMMARY:")
print(f"   Analysis completed for {len(validation_summary['datasets_analyzed'])} datasets")
print(f"   Consistency score: {validation_summary['overall_assessment']['consistency_score']:.2%}")
print(f"   Issues identified: {validation_summary['overall_assessment']['major_issues_found']}")
print(f"   Ready for processing: {validation_summary['overall_assessment']['data_ready_for_processing']}")
print(f"   {validation_summary['overall_assessment']['recommendation']}")

 Starting comprehensive data validation and quality assessment...

 Analyzing data types for training...
   Column analysis complete:
     Numeric columns: 0
     Object columns: 105
     Mixed type columns: 0
     Potential categorical: 30

 Analyzing data types for test...
   Column analysis complete:
     Numeric columns: 0
     Object columns: 104
     Mixed type columns: 0
     Potential categorical: 30

 Checking train-test data consistency...
   Consistency check results:
     Common columns: 104
     Type mismatches: 0
     Range issues: 0

 Creating data quality report for training...
   Quality assessment complete:
     Overall completeness: 98.5%
     Potential issues: 0
     Memory usage: 323.5 MB

 Creating data quality report for test...
   Quality assessment complete:
     Overall completeness: 98.5%
     Potential issues: 0
     Memory usage: 60.2 MB

 VALIDATION SUMMARY:
   Analysis completed for 2 datasets
   Consistency score: 100.00%
   Issues identified: 0
   Ready

In [6]:
# =============================================================================
# OUTPUT GENERATION WITH PROPER SERIALIZATION
# =============================================================================
# Purpose: Save all processed data with consistent formatting and serialization
# Handles: CSV output, pickle files, JSON reports, type conversion for serialization
# Output: Clean datasets ready for preprocessing, comprehensive documentation
# =============================================================================

def save_dataframe_with_types(df: pd.DataFrame, filepath: Path, dataset_name: str) -> Dict[str, Any]:
    """
    Save DataFrame with proper type handling and create metadata
    
    Args:
        df: DataFrame to save
        filepath: Output file path
        dataset_name: Name identifier for the dataset
        
    Returns:
        Dict: Save operation metadata
    """
    print(f" Saving {dataset_name} data to {filepath.name}...")
    
    try:
        # Ensure directory exists
        filepath.parent.mkdir(parents=True, exist_ok=True)
        
        # Save with proper encoding and index handling
        df.to_csv(filepath, index=False, encoding='utf-8', float_format='%.6f')
        
        # Create metadata
        file_metadata = {
            'filename': filepath.name,
            'filepath': str(filepath),
            'dataset_name': dataset_name,
            'rows': int(df.shape[0]),
            'columns': int(df.shape[1]),
            'file_size_bytes': int(filepath.stat().st_size),
            'file_size_mb': float(filepath.stat().st_size / 1024 / 1024),
            'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
            'save_timestamp': pd.Timestamp.now().isoformat(),
            'encoding': 'utf-8',
            'index_saved': False
        }
        
        print(f"   Saved successfully: {file_metadata['file_size_mb']:.2f} MB")
        return file_metadata
        
    except Exception as e:
        print(f"   Error saving {dataset_name}: {e}")
        raise

def save_json_with_serialization(data: Dict[str, Any], filepath: Path, data_name: str) -> Dict[str, Any]:
    """
    Save dictionary as JSON with proper type serialization
    
    Args:
        data: Dictionary to save
        filepath: Output file path
        data_name: Name identifier for the data
        
    Returns:
        Dict: Save operation metadata
    """
    print(f" Saving {data_name} to {filepath.name}...")
    
    def make_json_serializable(obj):
        """Recursively convert objects to JSON serializable types"""
        if isinstance(obj, (np.integer, np.int64, np.int32)):
            return int(obj)
        elif isinstance(obj, (np.floating, np.float64, np.float32)):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        elif isinstance(obj, pd.Series):
            return obj.tolist()
        elif isinstance(obj, pd.Timestamp):
            return obj.isoformat()
        elif isinstance(obj, dict):
            return {key: make_json_serializable(value) for key, value in obj.items()}
        elif isinstance(obj, list):
            return [make_json_serializable(item) for item in obj]
        else:
            return obj
    
    try:
        # Ensure directory exists
        filepath.parent.mkdir(parents=True, exist_ok=True)
        
        # Convert data to JSON serializable format
        serializable_data = make_json_serializable(data)
        
        # Save with proper formatting
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(serializable_data, f, indent=2, ensure_ascii=False)
        
        # Create metadata
        save_metadata = {
            'filename': filepath.name,
            'filepath': str(filepath),
            'data_name': data_name,
            'file_size_bytes': int(filepath.stat().st_size),
            'file_size_kb': float(filepath.stat().st_size / 1024),
            'save_timestamp': pd.Timestamp.now().isoformat(),
            'encoding': 'utf-8',
            'json_formatted': True
        }
        
        print(f"   JSON saved successfully: {save_metadata['file_size_kb']:.1f} KB")
        return save_metadata
        
    except Exception as e:
        print(f"   Error saving JSON {data_name}: {e}")
        raise

def save_pickle_with_protocol(data: Any, filepath: Path, data_name: str) -> Dict[str, Any]:
    """
    Save data as pickle with proper protocol handling
    
    Args:
        data: Data to pickle
        filepath: Output file path
        data_name: Name identifier for the data
        
    Returns:
        Dict: Save operation metadata
    """
    print(f" Saving {data_name} to {filepath.name}...")
    
    try:
        # Ensure directory exists
        filepath.parent.mkdir(parents=True, exist_ok=True)
        
        # Save with protocol 4 for Python 3.4+ compatibility
        with open(filepath, 'wb') as f:
            pickle.dump(data, f, protocol=4)
        
        # Create metadata
        save_metadata = {
            'filename': filepath.name,
            'filepath': str(filepath),
            'data_name': data_name,
            'file_size_bytes': int(filepath.stat().st_size),
            'file_size_kb': float(filepath.stat().st_size / 1024),
            'save_timestamp': pd.Timestamp.now().isoformat(),
            'pickle_protocol': 4,
            'data_type': str(type(data).__name__)
        }
        
        print(f"   Pickle saved successfully: {save_metadata['file_size_kb']:.1f} KB")
        return save_metadata
        
    except Exception as e:
        print(f"   Error saving pickle {data_name}: {e}")
        raise

# Start output generation process
print(" Starting output generation with proper serialization...")

# Initialize save metadata tracking
save_operations = {
    'timestamp': pd.Timestamp.now().isoformat(),
    'operations': [],
    'files_created': [],
    'total_size_mb': 0
}

# 1. Save cleaned CSV files
print("\n Saving processed CSV files...")

# Save training data
train_save_meta = save_dataframe_with_types(
    train_df_clean, 
    RAW_DATA_DIR / "train_raw.csv", 
    "training"
)
save_operations['operations'].append(train_save_meta)
save_operations['files_created'].append(train_save_meta['filepath'])

# Save test data
test_save_meta = save_dataframe_with_types(
    test_df_clean, 
    RAW_DATA_DIR / "test_raw.csv", 
    "test"
)
save_operations['operations'].append(test_save_meta)
save_operations['files_created'].append(test_save_meta['filepath'])

# 2. Save feature mapping as pickle
print("\n Saving feature mapping...")
mapping_save_meta = save_pickle_with_protocol(
    comprehensive_mapping,
    RAW_DATA_DIR / "feature_mapping.pkl",
    "feature_mapping"
)
save_operations['operations'].append(mapping_save_meta)
save_operations['files_created'].append(mapping_save_meta['filepath'])

# 3. Save validation summary as JSON
print("\n Saving validation summary...")
validation_save_meta = save_json_with_serialization(
    validation_summary,
    RAW_DATA_DIR / "validation_summary.json",
    "validation_summary"
)
save_operations['operations'].append(validation_save_meta)
save_operations['files_created'].append(validation_save_meta['filepath'])

# 4. Create and save comprehensive loading report
print("\n Creating comprehensive loading report...")
final_loading_report = {
    'project_info': {
        'objective': 'L&T Finance Pearl Challenge - Farmer Income Prediction',
        'target_metric': 'MAPE < 18%',
        'data_source': str(excel_filepath),
        'processing_timestamp': pd.Timestamp.now().isoformat()
    },
    'data_summary': {
        'training_data': {
            'rows': int(train_df_clean.shape[0]),
            'columns': int(train_df_clean.shape[1]),
            'target_variable': 'target_income' if 'target_income' in train_df_clean.columns else 'not_found',
            'memory_mb': float(train_df_clean.memory_usage(deep=True).sum() / 1024 / 1024)
        },
        'test_data': {
            'rows': int(test_df_clean.shape[0]),
            'columns': int(test_df_clean.shape[1]),
            'memory_mb': float(test_df_clean.memory_usage(deep=True).sum() / 1024 / 1024)
        },
    },
    'processing_summary': {
        'feature_sanitization': {
            'train_columns_processed': len(train_sanitization['column_mapping']),
            'test_columns_processed': len(test_sanitization['column_mapping']),
            'target_variable_found': validation_summary['overall_assessment']['data_ready_for_processing']
        },
        'data_validation': {
            'consistency_score': float(validation_summary['overall_assessment']['consistency_score']),
            'issues_found': int(validation_summary['overall_assessment']['major_issues_found']),
            'ready_for_processing': validation_summary['overall_assessment']['data_ready_for_processing']
        }
    },
    'file_outputs': save_operations,
    'next_steps': [
        'Run 02_exploratory_data_analysis.ipynb for comprehensive EDA',
        'Proceed with 03_preprocessing_feature_eng.ipynb for data preprocessing',
        'Address identified data quality issues during preprocessing',
        'Validate target variable distribution and outliers'
    ]
}

# Calculate total file size
total_size_mb = sum(op.get('file_size_mb', op.get('file_size_kb', 0) / 1024) for op in save_operations['operations'])
save_operations['total_size_mb'] = float(total_size_mb)
final_loading_report['file_outputs']['total_size_mb'] = float(total_size_mb)

# Save final loading report
report_save_meta = save_json_with_serialization(
    final_loading_report,
    RAW_DATA_DIR / "loading_report.json",
    "final_loading_report"
)
save_operations['operations'].append(report_save_meta)
save_operations['files_created'].append(report_save_meta['filepath'])

# 5. Display final summary
print(f"\n DATA LOADING AND CONVERSION COMPLETED SUCCESSFULLY!")
print(f"=" * 70)
print(f" PROCESSING SUMMARY:")
print(f"   Training data: {train_df_clean.shape[0]:,} rows × {train_df_clean.shape[1]} columns")
print(f"   Test data: {test_df_clean.shape[0]:,} rows × {test_df_clean.shape[1]} columns")
print(f"   Target variable: {' Found' if 'target_income' in train_df_clean.columns else ' Not found'}")

print(f"\n FILES CREATED:")
for i, filepath in enumerate(save_operations['files_created'], 1):
    print(f"  {i}. {Path(filepath).name}")

print(f"\n STORAGE SUMMARY:")
print(f"   Total files: {len(save_operations['files_created'])}")
print(f"   Total size: {save_operations['total_size_mb']:.2f} MB")
print(f"   Output directory: {RAW_DATA_DIR}")

print(f"\n READY FOR NEXT STEPS:")
print(f"   Data loaded with proper type handling")
print(f"   Feature names sanitized and mapped") 
print(f"   Data validation completed")
print(f"   All outputs properly serialized")
print(f"   Consistency score: {validation_summary['overall_assessment']['consistency_score']:.1%}")
print(f"   Next: Run 02_exploratory_data_analysis.ipynb")

print(f"=" * 70)

 Starting output generation with proper serialization...

 Saving processed CSV files...
 Saving training data to train_raw.csv...
   Saved successfully: 68.77 MB
 Saving test data to test_raw.csv...
   Saved successfully: 12.86 MB

 Saving feature mapping...
 Saving feature_mapping to feature_mapping.pkl...
   Pickle saved successfully: 18.5 KB

 Saving validation summary...
 Saving validation_summary to validation_summary.json...
   JSON saved successfully: 124.1 KB

 Creating comprehensive loading report...
 Saving final_loading_report to loading_report.json...
   JSON saved successfully: 16.4 KB

 DATA LOADING AND CONVERSION COMPLETED SUCCESSFULLY!
 PROCESSING SUMMARY:
   Training data: 53,306 rows × 105 columns
   Test data: 10,000 rows × 104 columns
   Target variable:  Found

 FILES CREATED:
  1. train_raw.csv
  2. test_raw.csv
  3. feature_mapping.pkl
  4. validation_summary.json
  5. loading_report.json

 STORAGE SUMMARY:
   Total files: 5
   Total size: 81.76 MB
   Output dir