In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, GradientBoostingRegressor
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler, SplineTransformer
from sklearn.metrics import mean_squared_error
from scipy.stats import norm
from scipy.optimize import minimize
import warnings
warnings.filterwarnings('ignore')

class PermutationWeighting:
    """Permutation Weighting implementation as described in Arbour et al. (2020)"""
    
    def __init__(self, A, X, classifier='logit', estimand='ATE', num_replicates=100):
        """
        Initialize Permutation Weighting
        
        Parameters:
        -----------
        A : array-like
            Treatment variable (binary or continuous)
        X : array-like
            Covariate matrix
        classifier : str
            Classifier type ('logit', 'boosting', 'sgd', 'mlp')
        estimand : str
            Estimand type ('ATE' or 'ATT')
        num_replicates : int
            Number of replicates to run
        """
        self.A = np.array(A).reshape(-1)
        self.X = np.array(X)
        self.classifier = classifier
        self.estimand = estimand.upper()
        self.num_replicates = num_replicates
        
        # Check data
        self._check_data()
        
        # Get data factory
        if self._is_binary_treatment() and self.estimand == 'ATE':
            self.factory = self._binary_ate_factory()
        elif self._is_binary_treatment() and self.estimand == 'ATT':
            self.factory = self._att_factory()
        else:
            self.factory = self._ate_factory()
        
        # Get trainer factory
        self.trainer = self._get_trainer_factory()
        
        # Compute weights
        self.weights = self._compute_weights()
    
    def _check_data(self):
        """Validate the input data"""
        if self.A.ndim != 1:
            raise ValueError("A must be a 1-dimensional array")
        
        if self.X.ndim != 2:
            raise ValueError("X must be a 2-dimensional array (matrix)")
        
        if len(self.A) != self.X.shape[0]:
            raise ValueError(f"A and X must have the same number of observations")
    
    def _is_binary_treatment(self):
        """Check if treatment is binary"""
        return len(np.unique(self.A)) == 2
    
    def _ate_factory(self):
        """Factory for ATE estimand"""
        N = len(self.A)
        
        def factory():
            # Generate bootstrap indices
            idx = np.random.choice(N, N, replace=True)
            
            # For permuted data, independently sample treatment and covariates
            perm_idx = np.random.permutation(N)
            pA = self.A[perm_idx]
            pX = self.X[idx]
            
            # For observed data, use same bootstrap indices
            oA = self.A[idx]
            oX = self.X[idx]
            
            return {
                'permuted': {'C': 1, 'A': pA, 'X': pX},
                'observed': {'C': 0, 'A': oA, 'X': oX}
            }
        
        return factory
    
    def _att_factory(self):
        """Factory for ATT estimand"""
        N = len(self.A)
        A1_idx = np.where(self.A == 1)[0]
        
        if len(A1_idx) == 0:
            raise ValueError('A must take the value of one at least once for the ATT.')
        
        def factory():
            # Sample treatment with replacement
            pA = np.random.choice(self.A, N, replace=True)
            
            # Sample covariates from treated units
            p_idx = np.random.choice(A1_idx, N, replace=True)
            pX = self.X[p_idx]
            
            # Sample observed data indices
            idx = np.random.choice(N, N, replace=True)
            
            return {
                'permuted': {'C': 1, 'A': pA, 'X': pX},
                'observed': {'C': 0, 'A': self.A[idx], 'X': self.X[idx]}
            }
        
        return factory
    
    def _binary_ate_factory(self):
        """Factory for binary ATE estimand"""
        N = len(self.A)
        unique_A = np.unique(self.A)
        
        def factory():
            # Create cross-product of unique A values with X
            return {
                'permuted': {
                    'C': 1,
                    'A': np.repeat(unique_A, N),
                    'X': np.vstack([self.X, self.X])
                },
                'observed': {
                    'C': 0,
                    'A': self.A,
                    'X': self.X
                }
            }
        
        return factory
    
    def _construct_df(self, data):
        """Construct a DataFrame from permuted and observed data"""
        # Extract dimensions
        n_permuted = len(data['permuted']['A'])
        n_observed = len(data['observed']['A'])
        n_features = data['permuted']['X'].shape[1]
        
        # Create base features
        df_dict = {
            'C': np.concatenate([
                np.repeat(data['permuted']['C'], n_permuted),
                np.repeat(data['observed']['C'], n_observed)
            ]),
            'A': np.concatenate([data['permuted']['A'], data['observed']['A']])
        }
        
        # Add X features
        X_combined = np.vstack([data['permuted']['X'], data['observed']['X']])
        for i in range(n_features):
            df_dict[f'X{i}'] = X_combined[:, i]
        
        # Add interactions between A and X
        for i in range(n_features):
            df_dict[f'A_X{i}'] = df_dict['A'] * df_dict[f'X{i}']
        
        return pd.DataFrame(df_dict)
    
    def _construct_eval_df(self, A, X):
        """Construct a DataFrame for evaluation"""
        n_features = X.shape[1]
        
        df_dict = {'A': A}
        
        # Add X features
        for i in range(n_features):
            df_dict[f'X{i}'] = X[:, i]
        
        # Add interactions between A and X
        for i in range(n_features):
            df_dict[f'A_X{i}'] = df_dict['A'] * df_dict[f'X{i}']
        
        return pd.DataFrame(df_dict)
    
    def _get_trainer_factory(self):
        """Get the appropriate trainer based on classifier type"""
        if self.classifier == 'logit':
            return self._logit_trainer
        elif self.classifier == 'boosting':
            return self._boosting_trainer
        elif self.classifier == 'sgd':
            return self._sgd_trainer
        elif self.classifier == 'mlp':
            return self._mlp_trainer
        else:
            raise ValueError(f'Unknown classifier: {self.classifier}')
    
    def _logit_trainer(self, data):
        """Train a logistic regression model"""
        df = self._construct_df(data)
        
        # Separate features and target
        X_cols = [col for col in df.columns if col != 'C']
        X_train = df[X_cols]
        y_train = df['C']
        
        # Train model
        model = LogisticRegression(
            penalty='l2',
            C=1.0,
            solver='lbfgs',
            max_iter=1000,
            random_state=42
        )
        model.fit(X_train, y_train)
        
        def weight_function(A, X):
            """Compute weights from the trained model"""
            eval_df = self._construct_eval_df(A, X)
            
            # Predict probabilities
            probs = model.predict_proba(eval_df)[:, 1]
            
            # Clip probabilities to avoid extreme weights
            probs = np.clip(probs, 0.00001, 0.99999)
            
            # Compute weights
            weights = probs / (1 - probs)
            
            return weights
        
        return weight_function
    
    def _boosting_trainer(self, data):
        """Train a gradient boosting model"""
        df = self._construct_df(data)
        
        # Separate features and target
        X_cols = [col for col in df.columns if col != 'C']
        X_train = df[X_cols]
        y_train = df['C']
        
        # Train model
        model = GradientBoostingClassifier(
            n_estimators=100,
            learning_rate=0.1,
            max_depth=3,
            random_state=42
        )
        model.fit(X_train, y_train)
        
        def weight_function(A, X):
            """Compute weights from the trained model"""
            eval_df = self._construct_eval_df(A, X)
            
            # Predict probabilities
            probs = model.predict_proba(eval_df)[:, 1]
            
            # Clip probabilities to avoid extreme weights
            probs = np.clip(probs, 0.00001, 0.99999)
            
            # Compute weights
            weights = probs / (1 - probs)
            
            return weights
        
        return weight_function
    
    def _sgd_trainer(self, data):
        """Train an SGD-based logistic regression model"""
        df = self._construct_df(data)
        
        # Separate features and target
        X_cols = [col for col in df.columns if col != 'C']
        X_train = df[X_cols]
        y_train = df['C']
        
        # Identify columns to scale: everything except 'A'
        to_scale_cols = [col for col in X_cols if col != 'A']
        
        # Apply scaling
        scaler = StandardScaler()
        X_train_scaled = X_train.copy()
        X_train_scaled[to_scale_cols] = scaler.fit_transform(X_train[to_scale_cols])
        
        # Train model
        model = SGDClassifier(
            loss='log_loss',
            penalty='l2',
            alpha=0.001,

SyntaxError: incomplete input (1407348398.py, line 293)