In [1]:
# Essential data types for AI applications
customer_id = 12345                    # Integer: unique identifiers
customer_name = "Sarah Chen"           # String: text data for processing
purchase_amount = 89.99               # Float: numerical data for calculations
is_premium_member = True              # Boolean: categorical flags
purchase_history = [45.99, 23.50, 12.99, 67.80]  # List: sequential data
customer_profile = {                  # Dictionary: structured data
    "age": 28,
    "location": "San Francisco",
    "preferences": ["electronics", "books"],
    "last_purchase": "2024-06-15"
}

# Type checking - crucial for AI data validation
print(f"Customer ID type: {type(customer_id)}")
print(f"Purchase history length: {len(purchase_history)}")
print(f"Profile keys: {list(customer_profile.keys())}")

Customer ID type: <class 'int'>
Purchase history length: 4
Profile keys: ['age', 'location', 'preferences', 'last_purchase']


In [2]:
# Functions: Building blocks of AI data processing
def calculate_customer_value(purchase_history, membership_multiplier=1.2):
    """Calculate customer lifetime value - essential for recommendation systems"""
    if not purchase_history:
        return 0.0
    
    total_spent = sum(purchase_history)
    average_purchase = total_spent / len(purchase_history)
    
    # Apply membership bonus (common in AI scoring systems)
    if membership_multiplier > 1.0:
        total_spent *= membership_multiplier
    
    return round(total_spent, 2)

print(purchase_history)

[45.99, 23.5, 12.99, 67.8]


In [3]:
# Classes: Organizing complex AI logic
class CustomerAnalyzer:
    """Encapsulates customer analysis logic for AI recommendations"""
    
    def __init__(self, premium_threshold=100.0):
        self.premium_threshold = premium_threshold
        self.analyzed_customers = 0
    
    def analyze_customer(self, customer_data):
        """Process customer data for AI model input"""
        try:
            # Extract features (common AI preprocessing step)
            total_value = calculate_customer_value(
                customer_data.get('purchase_history', []),
                1.2 if customer_data.get('is_premium', False) else 1.0
            )
            
            # Categorize customer (feature engineering for AI)
            category = "high_value" if total_value > self.premium_threshold else "standard"
            
            self.analyzed_customers += 1
            
            return {
                "customer_value": total_value,
                "category": category,
                "feature_count": len(customer_data.get('preferences', [])),
                "processed_at": "2024-06-26"
            }
        
        except Exception as e:
            print(f"Error processing customer: {e}")
            return None
    
    def get_stats(self):
        """Return processing statistics"""
        return f"Analyzed {self.analyzed_customers} customers"

# Usage example - this is how you'd use it in an AI pipeline
analyzer = CustomerAnalyzer(premium_threshold=75.0)
sample_customer = {
    'purchase_history': [45.99, 23.50, 67.80],
    'is_premium': True,
    'preferences': ['electronics', 'books', 'music']
}
result = analyzer.analyze_customer(sample_customer)
print(f"Analysis result: {result}")
print(analyzer.get_stats())

Analysis result: {'customer_value': 164.75, 'category': 'high_value', 'feature_count': 3, 'processed_at': '2024-06-26'}
Analyzed 1 customers


In [4]:
# Sample dataset - typical AI training data structure
customers_data = [
    {"id": 1, "purchases": [45.99, 23.50], "age": 28, "location": "SF"},
    {"id": 2, "purchases": [12.99, 8.50, 15.75], "age": 34, "location": "NYC"},
    {"id": 3, "purchases": [99.99], "age": 22, "location": "SF"},
    {"id": 4, "purchases": [5.99, 7.25, 12.50, 8.75], "age": 45, "location": "LA"},
    {"id": 5, "purchases": [], "age": 31, "location": "NYC"}
]

In [5]:
# List comprehensions: Essential for data preprocessing in AI

# Extract total spending per customer (feature extraction)
customer_totals = [sum(customer["purchases"]) for customer in customers_data]
print(f"Customer spending totals: {customer_totals}")

# Filter high-value customers (data filtering for AI models)
high_value_customers = [
    customer for customer in customers_data 
    if sum(customer["purchases"]) > 50.0
]
print(f"High-value customers: {len(high_value_customers)}")
# Create feature vectors (common AI preprocessing step)
feature_vectors = [
    {
        "id": customer["id"],
        "total_spent": sum(customer["purchases"]),
        "purchase_frequency": len(customer["purchases"]),
        "avg_purchase": sum(customer["purchases"]) / len(customer["purchases"]) if customer["purchases"] else 0,
        "age_category": "young" if customer["age"] < 30 else "mature"
    }
    for customer in customers_data
]
# Lambda functions: Quick data transformations
# Sort customers by value (ranking for recommendation priority)
sorted_customers = sorted(customers_data, 
                         key=lambda x: sum(x["purchases"]), 
                         reverse=True)
# Apply discount calculation (business logic in AI systems)
apply_discount = lambda amount: amount * 0.9 if amount > 50 else amount
discounted_totals = list(map(apply_discount, customer_totals))
print(f"Original totals: {customer_totals}")
print(f"After discount: {discounted_totals}")
# Advanced: Nested comprehensions for complex data structures
location_summary = {
    location: [
        sum(customer["purchases"]) 
        for customer in customers_data 
        if customer["location"] == location
    ]
    for location in set(customer["location"] for customer in customers_data)
}
print(f"Spending by location: {location_summary}")

Customer spending totals: [69.49000000000001, 37.24, 99.99, 34.49, 0]
High-value customers: 2
Original totals: [69.49000000000001, 37.24, 99.99, 34.49, 0]
After discount: [62.54100000000001, 37.24, 89.991, 34.49, 0]
Spending by location: {'SF': [69.49000000000001, 99.99], 'LA': [34.49], 'NYC': [37.24, 0]}


In [6]:
import json
from datetime import datetime

# File I/O: Essential for AI data loading and model persistence
def save_customer_analysis(analysis_results, filename="customer_analysis.json"):
    """Save analysis results - common in AI model outputs"""
    try:
        with open(filename, 'w') as file:
            json.dump(analysis_results, file, indent=2, default=str)
        print(f"Analysis saved to {filename}")
    except Exception as e:
        print(f"Error saving analysis: {e}")

def load_customer_data(filename="customers.json"):
    """Load customer data - typical AI data input process"""
    try:
        with open(filename, 'r') as file:
            data = json.load(file)
        print(f"Loaded {len(data)} customer records")
        return data
    except FileNotFoundError:
        print(f"File {filename} not found. Creating sample data...")
        return create_sample_data()
    except json.JSONDecodeError as e:
        print(f"Invalid JSON format: {e}")
        return []
    
def create_sample_data():
    """Create sample dataset - useful for AI experimentation"""
    sample_data = {
        "customers": customers_data,
        "metadata": {
            "created_at": datetime.now().isoformat(),
            "total_customers": len(customers_data),
            "data_version": "1.0"
        }
    }
    
    # Save sample data for future use
    with open("customers.json", 'w') as file:
        json.dump(sample_data, file, indent=2, default=str)
    
    return sample_data

# Complete AI data processing pipeline
def process_ai_dataset():
    """Complete data processing pipeline for AI applications"""
    
    # 1. Load data (typical AI workflow start)
    raw_data = load_customer_data()
    customers = raw_data.get("customers", []) if isinstance(raw_data, dict) else raw_data
    
    # 2. Initialize analyzer
    analyzer = CustomerAnalyzer()
    
    # 3. Process each customer (feature engineering)
    analysis_results = []
    for customer in customers:
        result = analyzer.analyze_customer(customer)
        if result:
            result["customer_id"] = customer["id"]
            analysis_results.append(result)
    
    # 4. Generate summary statistics (model evaluation metrics)
    summary = {
        "total_processed": len(analysis_results),
        "high_value_count": sum(1 for r in analysis_results if r["category"] == "high_value"),
        "average_value": sum(r["customer_value"] for r in analysis_results) / len(analysis_results) if analysis_results else 0,
        "processing_timestamp": datetime.now().isoformat()
    }
    
    # 5. Save results (model output persistence)
    final_output = {
        "analysis_results": analysis_results,
        "summary": summary
    }
    save_customer_analysis(final_output)
    
    return final_output

# Run the complete pipeline
if __name__ == "__main__":
    results = process_ai_dataset()
    print(f"\nPipeline completed! Processed {results['summary']['total_processed']} customers.")
    print(f"High-value customers: {results['summary']['high_value_count']}")
    print(f"Average customer value: ${results['summary']['average_value']:.2f}")

Loaded 2 customer records
Analysis saved to customer_analysis.json

Pipeline completed! Processed 5 customers.
High-value customers: 0
Average customer value: $0.00


In [7]:
# Module 1.1 Exercise Solutions
# AI Unleashed for Developers - Python Essentials for AI

import json
from datetime import datetime, timedelta
import random

# Exercise 1: Customer Segmentation Features
# Objective: Learn to extract features for AI customer segmentation

class CustomerAnalyzer:
    """Enhanced CustomerAnalyzer with behavioral features for AI segmentation"""
    
    def __init__(self, premium_threshold=100.0):
        self.premium_threshold = premium_threshold
        self.analyzed_customers = 0
    
    def analyze_customer(self, customer_data):
        """Process customer data for AI model input"""
        try:
            # Extract features (common AI preprocessing step)
            total_value = self.calculate_customer_value(
                customer_data.get('purchase_history', []),
                1.2 if customer_data.get('is_premium', False) else 1.0
            )
            
            # Categorize customer (feature engineering for AI)
            category = "high_value" if total_value > self.premium_threshold else "standard"
            
            self.analyzed_customers += 1
            
            return {
                "customer_value": total_value,
                "category": category,
                "feature_count": len(customer_data.get('preferences', [])),
                "processed_at": datetime.now().isoformat()
            }
        
        except Exception as e:
            print(f"Error processing customer: {e}")
            return None
    
    def calculate_customer_value(self, purchase_history, membership_multiplier=1.2):
        """Calculate customer lifetime value - essential for recommendation systems"""
        if not purchase_history:
            return 0.0
        
        total_spent = sum(purchase_history)
        
        # Apply membership bonus (common in AI scoring systems)
        if membership_multiplier > 1.0:
            total_spent *= membership_multiplier
        
        return round(total_spent, 2)
    
    # SOLUTION: Exercise 1 - Enhanced behavioral features
    def extract_behavioral_features(self, customer_data):
        """
        Extract advanced behavioral features for AI customer segmentation
        
        This method demonstrates feature engineering techniques commonly used
        in production recommendation systems and customer analytics.
        """
        features = {}
        
        # Basic customer info
        customer_id = customer_data.get('id', 'unknown')
        purchase_history = customer_data.get('purchase_history', [])
        preferences = customer_data.get('preferences', [])
        last_purchase_date = customer_data.get('last_purchase_date')
        age = customer_data.get('age', 0)
        location = customer_data.get('location', 'unknown')
        
        # Feature 1: Days since last purchase (recency feature)
        if last_purchase_date:
            try:
                last_date = datetime.fromisoformat(last_purchase_date)
                days_since_last = (datetime.now() - last_date).days
                features['days_since_last_purchase'] = days_since_last
                
                # Recency category (common in RFM analysis for AI)
                if days_since_last <= 30:
                    features['recency_category'] = 'recent'
                elif days_since_last <= 90:
                    features['recency_category'] = 'moderate'
                else:
                    features['recency_category'] = 'distant'
            except:
                features['days_since_last_purchase'] = 999  # Unknown/very old
                features['recency_category'] = 'unknown'
        else:
            features['days_since_last_purchase'] = 999
            features['recency_category'] = 'unknown'
        
        # Feature 2: Purchase diversity score (how varied are their purchases)
        if purchase_history:
            # Calculate coefficient of variation (std dev / mean)
            if len(purchase_history) > 1:
                mean_purchase = sum(purchase_history) / len(purchase_history)
                variance = sum((x - mean_purchase) ** 2 for x in purchase_history) / len(purchase_history)
                std_dev = variance ** 0.5
                
                # Diversity score: higher means more varied purchasing behavior
                diversity_score = std_dev / mean_purchase if mean_purchase > 0 else 0
                features['purchase_diversity_score'] = round(diversity_score, 3)
                
                # Categorize diversity (useful for recommendation algorithms)
                if diversity_score < 0.3:
                    features['purchase_pattern'] = 'consistent'
                elif diversity_score < 0.7:
                    features['purchase_pattern'] = 'moderate'
                else:
                    features['purchase_pattern'] = 'exploratory'
            else:
                features['purchase_diversity_score'] = 0
                features['purchase_pattern'] = 'single_purchase'
        else:
            features['purchase_diversity_score'] = 0
            features['purchase_pattern'] = 'no_purchases'
        
        # Feature 3: Seasonal preference analysis
        # Note: In real applications, you'd have actual purchase dates with seasons
        # This simulates seasonal preference based on purchase amounts and patterns
        if purchase_history:
            # Simulate seasonal data based on purchase patterns
            avg_purchase = sum(purchase_history) / len(purchase_history)
            
            # Higher average purchases might indicate luxury/holiday buying
            if avg_purchase > 75:
                features['seasonal_preference'] = 'holiday_buyer'
            elif len(purchase_history) > 5:  # Frequent purchases
                features['seasonal_preference'] = 'year_round'
            else:
                features['seasonal_preference'] = 'occasional'
        else:
            features['seasonal_preference'] = 'inactive'
        
        # Feature 4: Customer lifecycle stage (based on multiple factors)
        total_purchases = len(purchase_history)
        total_spent = sum(purchase_history) if purchase_history else 0
        
        if total_purchases == 0:
            lifecycle_stage = 'prospect'
        elif total_purchases == 1:
            lifecycle_stage = 'new_customer'
        elif total_purchases < 5 and total_spent < 200:
            lifecycle_stage = 'developing'
        elif total_spent > 500 or total_purchases > 10:
            lifecycle_stage = 'loyal'
        else:
            lifecycle_stage = 'established'
        
        features['lifecycle_stage'] = lifecycle_stage
        
        # Feature 5: Preference breadth (important for recommendation systems)
        features['preference_breadth'] = len(preferences)
        if len(preferences) == 0:
            features['preference_category'] = 'undefined'
        elif len(preferences) <= 2:
            features['preference_category'] = 'focused'
        elif len(preferences) <= 4:
            features['preference_category'] = 'moderate'
        else:
            features['preference_category'] = 'broad'
        
        # Feature 6: Age-based segments (demographic feature)
        if age < 25:
            features['age_segment'] = 'gen_z'
        elif age < 40:
            features['age_segment'] = 'millennial'
        elif age < 55:
            features['age_segment'] = 'gen_x'
        else:
            features['age_segment'] = 'boomer'
        
        # Feature 7: Geographic category (location-based features)
        metro_areas = ['SF', 'NYC', 'LA', 'Chicago', 'Boston', 'Seattle']
        features['location_type'] = 'metro' if location in metro_areas else 'other'
        
        return features

    def get_stats(self):
        """Return processing statistics"""
        return f"Analyzed {self.analyzed_customers} customers"


In [8]:
# Exercise 2: Data Pipeline Error Handling
# Objective: Build robust data processing for production AI systems

class RobustDataPipeline:
    """
    Enhanced data pipeline with comprehensive error handling for production AI systems.
    Demonstrates best practices for handling real-world data inconsistencies.
    """
    
    def __init__(self, log_file="data_pipeline.log"):
        self.log_file = log_file
        self.error_count = 0
        self.processed_count = 0
        self.warning_count = 0
        
    def log_message(self, level, message):
        """Log messages with timestamp for debugging"""
        timestamp = datetime.now().isoformat()
        log_entry = f"[{timestamp}] {level}: {message}\n"
        
        try:
            with open(self.log_file, 'a') as f:
                f.write(log_entry)
        except:
            # If we can't write to log file, at least print to console
            print(f"LOG: {log_entry.strip()}")
        
        # Also print to console for immediate feedback
        print(f"{level}: {message}")
    
    def validate_customer_data(self, customer_data):
        """
        Validate customer data structure and content.
        Returns (is_valid, cleaned_data, warnings)
        """
        warnings = []
        cleaned_data = {}
        
        # Check if data is a dictionary
        if not isinstance(customer_data, dict):
            return False, None, ["Data is not a dictionary"]
        
        # Validate and clean required fields
        # Customer ID
        customer_id = customer_data.get('id')
        if customer_id is None:
            return False, None, ["Missing customer ID"]
        
        try:
            cleaned_data['id'] = int(customer_id)
        except (ValueError, TypeError):
            return False, None, ["Invalid customer ID format"]
        
        # Purchase history
        purchase_history = customer_data.get('purchase_history', [])
        if not isinstance(purchase_history, list):
            warnings.append("Purchase history is not a list, converting to empty list")
            purchase_history = []
        
        # Clean purchase amounts
        cleaned_purchases = []
        for purchase in purchase_history:
            try:
                amount = float(purchase)
                if amount >= 0:  # Negative purchases don't make sense
                    cleaned_purchases.append(amount)
                else:
                    warnings.append(f"Negative purchase amount {amount} removed")
            except (ValueError, TypeError):
                warnings.append(f"Invalid purchase amount {purchase} removed")
        
        cleaned_data['purchase_history'] = cleaned_purchases
        
        # Age validation
        age = customer_data.get('age')
        if age is not None:
            try:
                age_int = int(age)
                if 13 <= age_int <= 120:  # Reasonable age range
                    cleaned_data['age'] = age_int
                else:
                    warnings.append(f"Age {age_int} outside reasonable range, set to None")
                    cleaned_data['age'] = None
            except (ValueError, TypeError):
                warnings.append(f"Invalid age format {age}, set to None")
                cleaned_data['age'] = None
        else:
            cleaned_data['age'] = None
        
        # Location validation
        location = customer_data.get('location', 'unknown')
        if isinstance(location, str) and location.strip():
            cleaned_data['location'] = location.strip()
        else:
            cleaned_data['location'] = 'unknown'
            warnings.append("Invalid or empty location, set to 'unknown'")
        
        # Preferences validation
        preferences = customer_data.get('preferences', [])
        if isinstance(preferences, list):
            # Clean and validate each preference
            cleaned_prefs = []
            for pref in preferences:
                if isinstance(pref, str) and pref.strip():
                    cleaned_prefs.append(pref.strip().lower())
            cleaned_data['preferences'] = cleaned_prefs
        else:
            warnings.append("Preferences not a list, set to empty list")
            cleaned_data['preferences'] = []
        
        # Premium membership
        is_premium = customer_data.get('is_premium', False)
        cleaned_data['is_premium'] = bool(is_premium)
        
        # Last purchase date
        last_purchase = customer_data.get('last_purchase_date')
        if last_purchase:
            try:
                # Try to parse the date
                datetime.fromisoformat(last_purchase)
                cleaned_data['last_purchase_date'] = last_purchase
            except:
                warnings.append("Invalid last purchase date format, set to None")
                cleaned_data['last_purchase_date'] = None
        else:
            cleaned_data['last_purchase_date'] = None
        
        return True, cleaned_data, warnings
    
    def process_customer_file(self, filename):
        """
        Process a customer data file with comprehensive error handling.
        Continues processing even when individual records fail.
        """
        self.log_message("INFO", f"Starting to process file: {filename}")
        
        try:
            # Try to load the file
            with open(filename, 'r') as f:
                data = json.load(f)
        except FileNotFoundError:
            self.log_message("ERROR", f"File {filename} not found")
            return None
        except json.JSONDecodeError as e:
            self.log_message("ERROR", f"Invalid JSON in {filename}: {e}")
            return None
        except Exception as e:
            self.log_message("ERROR", f"Unexpected error reading {filename}: {e}")
            return None
        
        # Handle different data structures
        if isinstance(data, dict) and 'customers' in data:
            customers = data['customers']
        elif isinstance(data, list):
            customers = data
        else:
            self.log_message("ERROR", "Data format not recognized")
            return None
        
        # Process each customer
        analyzer = CustomerAnalyzer()
        processed_customers = []
        
        for i, customer_data in enumerate(customers):
            try:
                # Validate and clean data
                is_valid, cleaned_data, warnings = self.validate_customer_data(customer_data)
                
                if warnings:
                    self.warning_count += len(warnings)
                    for warning in warnings:
                        self.log_message("WARNING", f"Customer {i}: {warning}")
                
                if not is_valid:
                    self.error_count += 1
                    self.log_message("ERROR", f"Customer {i}: Invalid data structure")
                    continue
                
                # Process with analyzer
                analysis_result = analyzer.analyze_customer(cleaned_data)
                if analysis_result:
                    # Add behavioral features
                    behavioral_features = analyzer.extract_behavioral_features(cleaned_data)
                    analysis_result.update(behavioral_features)
                    analysis_result['customer_id'] = cleaned_data['id']
                    
                    processed_customers.append(analysis_result)
                    self.processed_count += 1
                else:
                    self.error_count += 1
                    self.log_message("ERROR", f"Customer {i}: Analysis failed")
                    
            except Exception as e:
                self.error_count += 1
                self.log_message("ERROR", f"Customer {i}: Unexpected error - {e}")
                continue
        
        # Generate summary
        summary = {
            "total_input_records": len(customers),
            "successfully_processed": self.processed_count,
            "errors": self.error_count,
            "warnings": self.warning_count,
            "success_rate": (self.processed_count / len(customers)) * 100 if customers else 0
        }
        
        self.log_message("INFO", f"Processing complete. Success rate: {summary['success_rate']:.1f}%")
        
        return {
            "processed_customers": processed_customers,
            "summary": summary
        }


In [16]:
# Exercise 3: Advanced Data Transformation
# Objective: Master complex data manipulation for AI preprocessing

class AdvancedDataTransformer:
    """
    Advanced data transformation utilities for AI preprocessing.
    Demonstrates techniques used in production ML pipelines.
    """
    
    def __init__(self):
        self.feature_encoders = {}
        
    def create_one_hot_encoding(self, categories):
        """Create one-hot encoding mapping for categorical features"""
        encoding_map = {}
        for i, category in enumerate(sorted(set(categories))):
            encoding_map[category] = i
        return encoding_map
    
    def transform_customer_data_for_ml(self, customers_data):
        """
        Transform customer data into ML-ready format with one-hot encoding.
        This is the type of preprocessing you'd do before training ML models.
        """
        if not customers_data:
            return [], {}
        
        # Extract all unique categorical values for encoding
        all_locations = set()
        all_preferences = set()
        all_age_segments = set()
        all_lifecycle_stages = set()
        
        # Collect all categorical values first
        for customer in customers_data:
            all_locations.add(customer.get('location', 'unknown'))
            all_preferences.update(customer.get('preferences', []))
            all_age_segments.add(customer.get('age_segment', 'unknown'))
            all_lifecycle_stages.add(customer.get('lifecycle_stage', 'unknown'))
        
        # Create encoding mappings
        location_encoding = self.create_one_hot_encoding(all_locations)
        preference_encoding = self.create_one_hot_encoding(all_preferences)
        age_segment_encoding = self.create_one_hot_encoding(all_age_segments)
        lifecycle_encoding = self.create_one_hot_encoding(all_lifecycle_stages)
        
        # Store encoders for future use
        self.feature_encoders = {
            'location': location_encoding,
            'preferences': preference_encoding,
            'age_segment': age_segment_encoding,
            'lifecycle_stage': lifecycle_encoding
        }
        
        # Transform each customer
        transformed_data = []
        feature_names = []
        
        for customer in customers_data:
            features = []
            
            # Numerical features (already processed)
            numerical_features = [
                customer.get('customer_value', 0),
                customer.get('purchase_diversity_score', 0),
                customer.get('days_since_last_purchase', 999),
                customer.get('preference_breadth', 0),
                len(customer.get('purchase_history', [])),  # frequency
                customer.get('age', 30)  # default age
            ]
            features.extend(numerical_features)
            
            # One-hot encode location
            location = customer.get('location', 'unknown')
            location_vector = [0] * len(location_encoding)
            if location in location_encoding:
                location_vector[location_encoding[location]] = 1
            features.extend(location_vector)
            
            # One-hot encode age segment
            age_segment = customer.get('age_segment', 'unknown')
            age_segment_vector = [0] * len(age_segment_encoding)
            if age_segment in age_segment_encoding:
                age_segment_vector[age_segment_encoding[age_segment]] = 1
            features.extend(age_segment_vector)
            
            # One-hot encode lifecycle stage
            lifecycle = customer.get('lifecycle_stage', 'unknown')
            lifecycle_vector = [0] * len(lifecycle_encoding)
            if lifecycle in lifecycle_encoding:
                lifecycle_vector[lifecycle_encoding[lifecycle]] = 1
            features.extend(lifecycle_vector)
            
            # Multi-hot encode preferences (customer can have multiple)
            preference_vector = [0] * len(preference_encoding)
            customer_prefs = customer.get('preferences', [])
            for pref in customer_prefs:
                if pref in preference_encoding:
                    preference_vector[preference_encoding[pref]] = 1
            features.extend(preference_vector)
            
            # Binary features
            binary_features = [
                1 if customer.get('is_premium', False) else 0,
                1 if customer.get('category') == 'high_value' else 0,
                1 if customer.get('location_type') == 'metro' else 0
            ]
            features.extend(binary_features)
            
            transformed_data.append(features)
        
        # Create feature names for interpretability
        if not feature_names:  # Generate feature names once
            feature_names = [
                'customer_value', 'purchase_diversity', 'days_since_last_purchase',
                'preference_breadth', 'purchase_frequency', 'age'
            ]
            
            # Location features
            feature_names.extend([f'location_{loc}' for loc in sorted(location_encoding.keys())])
            
            # Age segment features
            feature_names.extend([f'age_segment_{seg}' for seg in sorted(age_segment_encoding.keys())])
            
            # Lifecycle features
            feature_names.extend([f'lifecycle_{stage}' for stage in sorted(lifecycle_encoding.keys())])
            
            # Preference features
            feature_names.extend([f'preference_{pref}' for pref in sorted(preference_encoding.keys())])
            
            # Binary features
            feature_names.extend(['is_premium', 'is_high_value', 'is_metro'])
        
        return transformed_data, {
            'feature_names': feature_names,
            'encoders': self.feature_encoders,
            'total_features': len(feature_names)
        }
    
    def generate_feature_summary(self, transformed_data, metadata):
        """Generate summary statistics of the transformed features"""
        if not transformed_data:
            return {}
        
        import statistics
        
        feature_names = metadata['feature_names']
        summary = {}
        
        # Calculate statistics for each feature
        for i, feature_name in enumerate(feature_names):
            feature_values = [row[i] for row in transformed_data]
            
            summary[feature_name] = {
                'mean': statistics.mean(feature_values),
                'min': min(feature_values),
                'max': max(feature_values),
                'non_zero_count': sum(1 for x in feature_values if x != 0)
            }
            
            # Add standard deviation for numerical features
            if len(set(feature_values)) > 2:  # Not binary
                try:
                    summary[feature_name]['std'] = statistics.stdev(feature_values)
                except:
                    summary[feature_name]['std'] = 0
        
        return summary



In [17]:

# Sample test data and demonstration
if __name__ == "__main__":
    # Create sample data with various edge cases for testing
    sample_customers = [
        {
            "id": 1,
            "purchase_history": [45.99, 23.50, 67.80, 12.99],
            "age": 28,
            "location": "SF",
            "preferences": ["electronics", "books"],
            "is_premium": True,
            "last_purchase_date": "2024-06-15"
        },
        {
            "id": 2,
            "purchase_history": [12.99, 8.50],
            "age": 34,
            "location": "NYC",
            "preferences": ["music", "movies", "books"],
            "is_premium": False,
            "last_purchase_date": "2024-05-20"
        },
        {
            "id": 3,
            "purchase_history": [99.99, 150.00],
            "age": 22,
            "location": "LA",
            "preferences": ["electronics"],
            "is_premium": True,
            "last_purchase_date": "2024-06-20"
        },
        # Edge cases for testing error handling
        {
            "id": "4",  # String ID (should be converted)
            "purchase_history": ["invalid", 25.50, -10.00],  # Mixed invalid data
            "age": "thirty",  # Invalid age
            "location": "",  # Empty location
            "preferences": "not a list",  # Invalid preferences
            "is_premium": "yes"  # String boolean
        },
        {
            # Missing ID - should be rejected
            "purchase_history": [50.00],
            "age": 25
        }
    ]
    
    print("=== Exercise Solutions Demo ===\n")
    
    # Test Exercise 1: Enhanced behavioral features
    print("1. Testing Enhanced Behavioral Features:")
    analyzer = CustomerAnalyzer()
    
    for customer in sample_customers[:3]:  # Use valid customers only
        features = analyzer.extract_behavioral_features(customer)
        print(f"Customer {customer['id']} behavioral features:")
        for key, value in features.items():
            print(f"  {key}: {value}")
        print()
    
    # Test Exercise 2: Robust error handling
    print("2. Testing Robust Error Handling:")
    
    # Save sample data to file for testing
    with open('test_customers.json', 'w') as f:
        json.dump({"customers": sample_customers}, f, indent=2)
    
    pipeline = RobustDataPipeline("test_pipeline.log")
    result = pipeline.process_customer_file('test_customers.json')
    
    if result:
        print(f"Processing Summary:")
        for key, value in result['summary'].items():
            print(f"  {key}: {value}")
        print(f"\nSuccessfully processed {len(result['processed_customers'])} customers")
    
    # Test Exercise 3: Advanced data transformation
    print("\n3. Testing Advanced Data Transformation:")
    
    if result and result['processed_customers']:
        transformer = AdvancedDataTransformer()
        transformed_data, metadata = transformer.transform_customer_data_for_ml(
            result['processed_customers']
        )
        
        print(f"Transformed {len(transformed_data)} customers into ML-ready format")
        print(f"Total features per customer: {metadata['total_features']}")
        print(f"Feature categories: {len(metadata['encoders'])} categorical encodings")
        
        # Show sample transformed customer
        if transformed_data:
            print(f"\nSample transformed customer (first 10 features):")
            print(f"Features: {transformed_data[0][:10]}")
            print(f"Feature names: {metadata['feature_names'][:10]}")
            
        # Generate and display feature summary
        summary = transformer.generate_feature_summary(transformed_data, metadata)
        print(f"\nFeature Summary (first 5 features):")
        for i, (feature_name, stats) in enumerate(list(summary.items())[:5]):
            print(f"  {feature_name}: mean={stats['mean']:.2f}, min={stats['min']}, max={stats['max']}")
    
    print("\n=== All Exercises Completed Successfully! ===")
    print("Check 'test_pipeline.log' for detailed processing logs.")

=== Exercise Solutions Demo ===

1. Testing Enhanced Behavioral Features:
Customer 1 behavioral features:
  days_since_last_purchase: 377
  recency_category: distant
  purchase_diversity_score: 0.563
  purchase_pattern: moderate
  seasonal_preference: occasional
  lifecycle_stage: developing
  preference_breadth: 2
  preference_category: focused
  age_segment: millennial
  location_type: metro

Customer 2 behavioral features:
  days_since_last_purchase: 403
  recency_category: distant
  purchase_diversity_score: 0.209
  purchase_pattern: consistent
  seasonal_preference: occasional
  lifecycle_stage: developing
  preference_breadth: 3
  preference_category: moderate
  age_segment: millennial
  location_type: metro

Customer 3 behavioral features:
  days_since_last_purchase: 372
  recency_category: distant
  purchase_diversity_score: 0.2
  purchase_pattern: consistent
  seasonal_preference: holiday_buyer
  lifecycle_stage: established
  preference_breadth: 1
  preference_category: focus