In [1]:
# Customer Churn Analysis - Data Science Project
# Author: Lavanyaa Gupta
# Description: A comprehensive analysis of customer churn using machine learning

In [None]:
# Import libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
from sklearn.impute import SimpleImputer
import warnings
warnings.filterwarnings('ignore')

In [None]:
# Build classes
class CustomerChurnAnalyzer:
    """
    A comprehensive customer churn analysis tool that demonstrates
    key data science skills including EDA, feature engineering, and ML modeling.
    """
    
    def __init__(self):
        self.df = None
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
        self.scaler = StandardScaler()
        self.models = {}
        self.results = {}
        
    def generate_sample_data(self, n_samples=1000):
        """Generate realistic customer data for demonstration purposes."""
        np.random.seed(42)
        
        # Generate customer features
        data = {
            'customer_id': range(1, n_samples + 1),
            'age': np.random.normal(45, 15, n_samples).astype(int),
            'tenure_months': np.random.exponential(24, n_samples).astype(int),
            'monthly_charges': np.random.normal(65, 20, n_samples),
            'total_charges': np.random.normal(1500, 800, n_samples),
            'contract_type': np.random.choice(['Month-to-month', 'One year', 'Two year'], 
                                            n_samples, p=[0.5, 0.3, 0.2]),
            'internet_service': np.random.choice(['DSL', 'Fiber optic', 'No'], 
                                               n_samples, p=[0.4, 0.4, 0.2]),
            'tech_support': np.random.choice(['Yes', 'No'], n_samples, p=[0.3, 0.7]),
            'payment_method': np.random.choice(['Electronic check', 'Mailed check', 
                                              'Bank transfer', 'Credit card'], 
                                            n_samples, p=[0.4, 0.2, 0.2, 0.2])
        }
        
        # Create realistic churn based on features
        churn_prob = (
            0.1 +  # Base churn rate
            0.3 * (data['contract_type'] == 'Month-to-month') +
            0.2 * (data['payment_method'] == 'Electronic check') +
            0.15 * (data['tech_support'] == 'No') +
            0.1 * (np.array(data['monthly_charges']) > 80) +
            -0.2 * (np.array(data['tenure_months']) > 24)
        )
        
        data['churn'] = np.random.binomial(1, np.clip(churn_prob, 0, 1), n_samples)
        
        self.df = pd.DataFrame(data)
        
        # Add some missing values for realistic data cleaning demo
        missing_indices = np.random.choice(self.df.index, size=int(0.05 * n_samples), replace=False)
        self.df.loc[missing_indices, 'total_charges'] = np.nan
        
        print(f"Generated dataset with {n_samples} customers")
        return self.df
    
    def load_data(self, filepath=None):
        """Load data from file or generate sample data."""
        if filepath:
            try:
                self.df = pd.read_csv(filepath)
                print(f"Loaded data from {filepath}")
            except FileNotFoundError:
                print(f"File {filepath} not found. Generating sample data instead.")
                self.generate_sample_data()
        else:
            self.generate_sample_data()
        
        return self.df
    
    def exploratory_data_analysis(self):
        """Perform comprehensive exploratory data analysis."""
        print("=== EXPLORATORY DATA ANALYSIS ===\n")
        
        # Basic info
        print("Dataset Shape:", self.df.shape)
        print("\nData Types:")
        print(self.df.dtypes)
        
        print("\nMissing Values:")
        print(self.df.isnull().sum())
        
        print("\nChurn Distribution:")
        churn_counts = self.df['churn'].value_counts()
        print(churn_counts)
        print(f"Churn Rate: {churn_counts[1] / len(self.df) * 100:.2f}%")
        
        # Statistical summary
        print("\nNumerical Features Summary:")
        print(self.df.describe())
        
        # Create visualizations
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))
        
        # Churn distribution
        self.df['churn'].value_counts().plot(kind='bar', ax=axes[0,0], color=['skyblue', 'salmon'])
        axes[0,0].set_title('Churn Distribution')
        axes[0,0].set_xlabel('Churn (0=No, 1=Yes)')
        
        # Age distribution by churn
        self.df.boxplot(column='age', by='churn', ax=axes[0,1])
        axes[0,1].set_title('Age Distribution by Churn')
        
        # Monthly charges by churn
        self.df.boxplot(column='monthly_charges', by='churn', ax=axes[0,2])
        axes[0,2].set_title('Monthly Charges by Churn')
        
        # Tenure distribution
        self.df['tenure_months'].hist(bins=30, ax=axes[1,0], alpha=0.7, color='green')
        axes[1,0].set_title('Tenure Distribution')
        axes[1,0].set_xlabel('Tenure (months)')
        
        # Contract type vs churn
        pd.crosstab(self.df['contract_type'], self.df['churn']).plot(kind='bar', 
                                                                    ax=axes[1,1], 
                                                                    color=['skyblue', 'salmon'])
        axes[1,1].set_title('Contract Type vs Churn')
        axes[1,1].legend(['No Churn', 'Churn'])
        
        # Correlation heatmap
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        corr_matrix = self.df[numeric_cols].corr()
        sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, ax=axes[1,2])
        axes[1,2].set_title('Feature Correlation Matrix')
        
        plt.tight_layout()
        plt.show()
        
        return self.df
    
    def preprocess_data(self):
        """Clean and preprocess the data for machine learning."""
        print("\n=== DATA PREPROCESSING ===\n")
        
        # Handle missing values
        print("Handling missing values...")
        imputer = SimpleImputer(strategy='median')
        self.df['total_charges'] = imputer.fit_transform(self.df[['total_charges']])
        
        # Create feature engineering
        print("Engineering new features...")
        self.df['avg_monthly_charges'] = self.df['total_charges'] / (self.df['tenure_months'] + 1)
        self.df['high_value_customer'] = (self.df['monthly_charges'] > self.df['monthly_charges'].quantile(0.75)).astype(int)
        self.df['long_tenure'] = (self.df['tenure_months'] > 24).astype(int)
        
        # Encode categorical variables
        print("Encoding categorical variables...")
        le = LabelEncoder()
        categorical_cols = ['contract_type', 'internet_service', 'tech_support', 'payment_method']
        
        for col in categorical_cols:
            self.df[col + '_encoded'] = le.fit_transform(self.df[col])
        
        # Prepare features and target
        feature_cols = ['age', 'tenure_months', 'monthly_charges', 'total_charges', 
                       'avg_monthly_charges', 'high_value_customer', 'long_tenure'] + \
                       [col + '_encoded' for col in categorical_cols]
        
        X = self.df[feature_cols]
        y = self.df['churn']
        
        # Split the data
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )
        
        # Scale features
        self.X_train_scaled = self.scaler.fit_transform(self.X_train)
        self.X_test_scaled = self.scaler.transform(self.X_test)
        
        print(f"Training set size: {self.X_train.shape}")
        print(f"Test set size: {self.X_test.shape}")
        
        return self.X_train, self.X_test, self.y_train, self.y_test
    
    def train_models(self):
        """Train multiple machine learning models."""
        print("\n=== MODEL TRAINING ===\n")
        
        # Define models
        models = {
            'Logistic Regression': LogisticRegression(random_state=42),
            'Random Forest': RandomForestClassifier(random_state=42, n_estimators=100)
        }
        
        # Train and evaluate models
        for name, model in models.items():
            print(f"Training {name}...")
            
            # Use scaled data for Logistic Regression, original for Random Forest
            if name == 'Logistic Regression':
                X_train_use = self.X_train_scaled
                X_test_use = self.X_test_scaled
            else:
                X_train_use = self.X_train
                X_test_use = self.X_test
            
            # Train model
            model.fit(X_train_use, self.y_train)
            
            # Make predictions
            y_pred = model.predict(X_test_use)
            y_pred_proba = model.predict_proba(X_test_use)[:, 1]
            
            # Calculate metrics
            cv_scores = cross_val_score(model, X_train_use, self.y_train, cv=5, scoring='roc_auc')
            roc_auc = roc_auc_score(self.y_test, y_pred_proba)
            
            # Store results
            self.models[name] = model
            self.results[name] = {
                'predictions': y_pred,
                'probabilities': y_pred_proba,
                'cv_scores': cv_scores,
                'roc_auc': roc_auc,
                'X_test': X_test_use
            }
            
            print(f"{name} - Cross-validation AUC: {cv_scores.mean():.3f} (+/- {cv_scores.std() * 2:.3f})")
            print(f"{name} - Test AUC: {roc_auc:.3f}")
            print()
    
    def evaluate_models(self):
        """Evaluate and compare model performance."""
        print("\n=== MODEL EVALUATION ===\n")
        
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        
        # ROC Curves
        for name, results in self.results.items():
            fpr, tpr, _ = roc_curve(self.y_test, results['probabilities'])
            axes[0,0].plot(fpr, tpr, label=f"{name} (AUC = {results['roc_auc']:.3f})")
        
        axes[0,0].plot([0, 1], [0, 1], 'k--', label='Random')
        axes[0,0].set_xlabel('False Positive Rate')
        axes[0,0].set_ylabel('True Positive Rate')
        axes[0,0].set_title('ROC Curves')
        axes[0,0].legend()
        
        # Feature Importance (Random Forest)
        if 'Random Forest' in self.models:
            rf_model = self.models['Random Forest']
            feature_names = self.X_train.columns
            importances = rf_model.feature_importances_
            indices = np.argsort(importances)[::-1][:10]  # Top 10 features
            
            axes[0,1].bar(range(len(indices)), importances[indices])
            axes[0,1].set_xlabel('Features')
            axes[0,1].set_ylabel('Importance')
            axes[0,1].set_title('Top 10 Feature Importances (Random Forest)')
            axes[0,1].set_xticks(range(len(indices)))
            axes[0,1].set_xticklabels([feature_names[i] for i in indices], rotation=45)
        
        # Confusion Matrix for best model
        best_model_name = max(self.results.keys(), key=lambda x: self.results[x]['roc_auc'])
        cm = confusion_matrix(self.y_test, self.results[best_model_name]['predictions'])
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[1,0])
        axes[1,0].set_xlabel('Predicted')
        axes[1,0].set_ylabel('Actual')
        axes[1,0].set_title(f'Confusion Matrix - {best_model_name}')
        
        # Model Comparison
        model_names = list(self.results.keys())
        auc_scores = [self.results[name]['roc_auc'] for name in model_names]
        cv_means = [self.results[name]['cv_scores'].mean() for name in model_names]
        
        x = np.arange(len(model_names))
        width = 0.35
        
        axes[1,1].bar(x - width/2, auc_scores, width, label='Test AUC', color='skyblue')
        axes[1,1].bar(x + width/2, cv_means, width, label='CV AUC', color='salmon')
        axes[1,1].set_xlabel('Models')
        axes[1,1].set_ylabel('AUC Score')
        axes[1,1].set_title('Model Performance Comparison')
        axes[1,1].set_xticks(x)
        axes[1,1].set_xticklabels(model_names)
        axes[1,1].legend()
        
        plt.tight_layout()
        plt.show()
        
        # Print detailed results
        print("Detailed Classification Reports:")
        print("=" * 50)
        
        for name, results in self.results.items():
            print(f"\n{name}:")
            print(classification_report(self.y_test, results['predictions']))
    
    def generate_insights(self):
        """Generate business insights from the analysis."""
        print("\n=== BUSINESS INSIGHTS ===\n")
        
        insights = []
        
        # Churn rate analysis
        churn_rate = self.df['churn'].mean() * 100
        insights.append(f"Overall churn rate is {churn_rate:.1f}%")
        
        # Contract type impact
        contract_churn = self.df.groupby('contract_type')['churn'].mean()
        worst_contract = contract_churn.idxmax()
        insights.append(f"'{worst_contract}' contracts have the highest churn rate at {contract_churn[worst_contract]*100:.1f}%")
        
        # Payment method impact
        payment_churn = self.df.groupby('payment_method')['churn'].mean()
        worst_payment = payment_churn.idxmax()
        insights.append(f"Customers using '{worst_payment}' have the highest churn rate at {payment_churn[worst_payment]*100:.1f}%")
        
        # High-value customer analysis
        if 'high_value_customer' in self.df.columns:
            high_value_churn = self.df[self.df['high_value_customer'] == 1]['churn'].mean()
            low_value_churn = self.df[self.df['high_value_customer'] == 0]['churn'].mean()
            insights.append(f"High-value customers churn at {high_value_churn*100:.1f}% vs {low_value_churn*100:.1f}% for others")
        
        # Model performance insight
        best_model_name = max(self.results.keys(), key=lambda x: self.results[x]['roc_auc'])
        best_auc = self.results[best_model_name]['roc_auc']
        insights.append(f"Best performing model is {best_model_name} with AUC of {best_auc:.3f}")
        
        print("Key Insights:")
        for i, insight in enumerate(insights, 1):
            print(f"{i}. {insight}")
        
        print("\nRecommendations:")
        print("1. Focus retention efforts on month-to-month contract customers")
        print("2. Investigate payment method preferences and offer incentives for stable methods")
        print("3. Implement early warning systems using the trained model")
        print("4. Develop targeted retention campaigns for high-risk customer segments")
    
    def run_full_analysis(self, filepath=None):
        """Run the complete analysis pipeline."""
        print("Starting Customer Churn Analysis...")
        print("=" * 50)
        
        # Load data
        self.load_data(filepath)
        
        # EDA
        self.exploratory_data_analysis()
        
        # Preprocessing
        self.preprocess_data()
        
        # Model training
        self.train_models()
        
        # Evaluation
        self.evaluate_models()
        
        # Insights
        self.generate_insights()
        
        print("\n" + "=" * 50)
        print("Analysis Complete!")
        
        return self.df, self.models, self.results


def main():
    """Main function to run the analysis."""
    # Initialize analyzer
    analyzer = CustomerChurnAnalyzer()
    
    # Run full analysis
    df, models, results = analyzer.run_full_analysis()
    
    # Optional: Save results
    # df.to_csv('processed_customer_data.csv', index=False)
    # print("\nResults saved to 'processed_customer_data.csv'")


if __name__ == "__main__":
    main()

In [None]:
# Usage Examples:

# 1. Basic usage:
#    analyzer = CustomerChurnAnalyzer()
#    analyzer.run_full_analysis()

# 2. With your own data:
#    analyzer = CustomerChurnAnalyzer()
#    analyzer.run_full_analysis('your_data.csv')

# 3. Step by step:
#    analyzer = CustomerChurnAnalyzer()
#    analyzer.load_data()
#    analyzer.exploratory_data_analysis()
#    analyzer.preprocess_data()
#    analyzer.train_models()
#    analyzer.evaluate_models()
#    analyzer.generate_insights()