In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from scipy.optimize import minimize
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam
from matplotlib.dates import DateFormatter

class CDSHedgingOptimizer:
    def __init__(self, pnl_data, cds_data, initial_capital=1000000):
        self.pnl_data = pnl_data
        self.cds_data = cds_data
        self.initial_capital = initial_capital
        self.scaler = StandardScaler()
        self.models = {}
        self.cds_indexes = ['ITRX Main', 'ITRX XOVER', 'CDX IG', 'CDX HY', 'CDX EM']
        
        # Coupon payments for each index (in basis points)
        self.coupon_rates = {
            'ITRX XOVER': 500,
            'CDX HY': 500,
            'CDX EM': 100,
            'ITRX Main': 100,
            'CDX IG': 100
        }
        
        # CS01 constraints
        self.cs01_constraints = {
            'ITRX XOVER': 100000,
            'CDX HY': 100000,
            'CDX EM': 100000,
            'ITRX Main': 250000,
            'CDX IG': 250000
        }
        
        self.roll_cost = 0.20  # 20% of spread per annum
        self.max_hedges = 2    # Max 2 CDS indexes at a time
        self.notional = 10000000  # $10mm notional for CS01 calculation
        self.max_holding_period = 20  # Max holding period in days
        self.hedge_durations = {index: 0 for index in self.cds_indexes}
        
        # Initialize models
        self._initialize_models()
        
    def _initialize_models(self):
        # Model to predict PnL based on market conditions
        self.models['pnl_predictor'] = RandomForestRegressor(n_estimators=100)
        
        # Neural network for hedge recommendation
        self.models['hedge_recommender'] = self._build_nn_model()
        
        # Model to estimate CDS holding costs
        self.models['cost_estimator'] = RandomForestRegressor(n_estimators=50)
        
    def _build_nn_model(self):
        model = Sequential([
            Dense(64, activation='relu', input_shape=(len(self.cds_indexes) + 1,)),
            Dropout(0.2),
            Dense(32, activation='relu'),
            Dense(len(self.cds_indexes), activation='linear')
        ])
        model.compile(optimizer=Adam(0.001), loss='mse')
        return model
    
    def preprocess_data(self):
        # Combine PnL and CDS data
        combined = pd.merge(self.pnl_data, self.cds_data, on='Date')
        
        # Calculate daily PnL changes
        combined['PnL_change'] = combined['PnL'].diff()
        
        # Calculate CDS returns
        for index in self.cds_indexes:
            combined[f'{index}_return'] = combined[index].pct_change()
            
        # Drop NA values
        combined = combined.dropna()
        
        # Normalize features
        features = ['PnL'] + [f'{index}_return' for index in self.cds_indexes]
        self.scaler.fit(combined[features])
        
        return combined
    
    def train_models(self, data):
        # Prepare features and targets
        X = data[['PnL'] + [f'{index}_return' for index in self.cds_indexes]]
        y_pnl = data['PnL_change']
        y_hedge = data[[f'{index}_return' for index in self.cds_indexes]]  # For NN
        
        # Scale features
        X_scaled = self.scaler.transform(X)
        
        # Train PnL predictor
        self.models['pnl_predictor'].fit(X_scaled, y_pnl)
        
        # Train hedge recommender
        self.models['hedge_recommender'].fit(
            X_scaled, 
            y_hedge.values,
            epochs=50,
            batch_size=32,
            validation_split=0.2,
            verbose=0
        )
        
        # Train cost estimator
        self.models['cost_estimator'].fit(
            X_scaled,
            data[[f'{index}_return' for index in self.cds_indexes]].mean(axis=1)
        )
    
    def calculate_holding_cost(self, hedge_amounts, current_spreads):
        """Calculate holding costs including coupon payments and roll costs"""
        # Convert hedge amounts to notionals
        notionals = hedge_amounts / self.notional
        
        # Coupon payments (in bps, paid quarterly)
        coupon_cost = np.sum([
            notionals[i] * self.coupon_rates[index] * 0.0001 * self.notional / 4  # Quarterly payment
            for i, index in enumerate(self.cds_indexes)
        ])
        
        # Roll costs (20% of spread)
        roll_cost = np.sum([
            notionals[i] * current_spreads[i] * 0.0001 * self.notional * self.roll_cost / 4  # Quarterly
            for i, index in enumerate(self.cds_indexes)
        ])
        
        return coupon_cost + roll_cost
    
    def calculate_cs01(self, hedge_amounts):
        """Calculate CS01 for current hedge positions"""
        # Simplified CS01 calculation
        durations = {
            'ITRX Main': 4,
            'CDX IG': 4,
            'ITRX XOVER': 5,
            'CDX HY': 5,
            'CDX EM': 5
        }
        
        cs01 = {}
        notionals = hedge_amounts / self.notional
        for i, index in enumerate(self.cds_indexes):
            cs01[index] = notionals[i] * durations[index] * 10000  # $ per bp
            
        return cs01
    
    def optimize_hedge(self, current_state):
        """Optimize hedge amounts with holding period constraints"""
        # Predict potential PnL changes without hedge
        current_returns = np.array([current_state[f'{index}_return'] for index in self.cds_indexes])
        
        # Get initial hedge recommendation from NN
        scaled_state = self.scaler.transform([current_state[['PnL'] + [f'{index}_return' for index in self.cds_indexes]])
        hedge_recommendation = self.models['hedge_recommender'].predict(scaled_state)[0]
        
        # Define optimization problem
        def objective(x):
            # x is array of hedge amounts in notional terms
            hedge_amounts = x * self.notional
            
            # Calculate expected PnL protection
            pnl_protection = np.dot(hedge_amounts, current_returns)
            
            # Calculate holding costs
            current_spreads = np.array([current_state[index] for index in self.cds_indexes])
            holding_cost = self.calculate_holding_cost(hedge_amounts, current_spreads)
            
            # We want to maximize protection and minimize cost
            return -pnl_protection + holding_cost
        
        # Constraints
        constraints = []
        
        # CS01 constraints
        def cs01_constraint_func(x, index):
            hedge_amounts = x * self.notional
            cs01 = self.calculate_cs01(hedge_amounts)
            return cs01[index] - self.cs01_constraints[index]
        
        for index in self.cds_indexes:
            constraints.append({
                'type': 'ineq',
                'fun': lambda x, idx=index: cs01_constraint_func(x, idx)
            })
        
        # Max 2 CDS indexes constraint
        constraints.append({'type': 'ineq', 'fun': lambda x: self.max_hedges - np.sum(x > 0.01)})
        
        # Bounds (0-20 units of notional for each hedge)
        bounds = [(0, 20) for _ in self.cds_indexes]
        
        # Initial guess (from NN recommendation)
        x0 = np.clip(hedge_recommendation / np.max(hedge_recommendation) * 10, 0, 20)
        
        # Solve optimization problem
        result = minimize(
            objective,
            x0,
            method='SLSQP',
            bounds=bounds,
            constraints=constraints,
            options={'maxiter': 100}
        )
        
        optimal_hedges = result.x * self.notional
        current_hedges = dict(zip(self.cds_indexes, optimal_hedges))
        
        # Implement holding period logic
        for index in self.cds_indexes:
            if current_hedges[index] > 0:
                self.hedge_durations[index] += 1
                # Force exit if held too long
                if self.hedge_durations[index] > self.max_holding_period:
                    current_hedges[index] = 0
                    self.hedge_durations[index] = 0
            else:
                self.hedge_durations[index] = 0
                
        return current_hedges
    
    def run_backtest(self, data):
        """Backtest the strategy on historical data"""
        portfolio_values = []
        hedge_positions = []
        hedge_costs = []
        
        current_value = self.initial_capital
        current_hedges = {index: 0 for index in self.cds_indexes}
        
        for i in range(1, len(data)):
            current_state = data.iloc[i].to_dict()
            previous_state = data.iloc[i-1].to_dict()
            
            # Update portfolio value from previous PnL
            pnl_change = current_state['PnL'] - previous_state['PnL']
            current_value += pnl_change
            
            # Calculate hedge impact from previous position
            hedge_returns = np.sum([
                current_hedges[index] * current_state[f'{index}_return']
                for index in self.cds_indexes
            ])
            current_value += hedge_returns
            
            # Calculate holding costs
            current_spreads = np.array([current_state[index] for index in self.cds_indexes])
            holding_cost = self.calculate_holding_cost(
                np.array([current_hedges[index] for index in self.cds_indexes]),
                current_spreads
            )
            current_value -= holding_cost
            hedge_costs.append(holding_cost)
            
            # Optimize new hedge position
            optimal_hedges = self.optimize_hedge(current_state)
            current_hedges = optimal_hedges
            
            portfolio_values.append(current_value)
            hedge_positions.append(current_hedges.copy())
            
        return pd.DataFrame({
            'Date': data['Date'].iloc[1:],
            'Portfolio_Value': portfolio_values,
            'Hedge_Positions': hedge_positions,
            'Hedge_Costs': hedge_costs
        })
    
    def visualize_results(self, results):
        """Generate comprehensive visualizations"""
        # Extract hedge positions over time
        hedge_df = pd.DataFrame.from_records(results['Hedge_Positions'])
        hedge_df['Date'] = results['Date']
        hedge_df.set_index('Date', inplace=True)
        
        # 1. Portfolio Value vs Hedge Costs
        plt.figure(figsize=(15, 6))
        plt.plot(results['Date'], results['Portfolio_Value'], label='Portfolio Value')
        plt.plot(results['Date'].iloc[1:], results['Hedge_Costs'].cumsum(), label='Cumulative Hedge Costs')
        plt.title('Portfolio Value vs Cumulative Hedge Costs')
        plt.ylabel('USD')
        plt.xlabel('Date')
        plt.legend()
        plt.grid(True)
        plt.gca().xaxis.set_major_formatter(DateFormatter("%Y-%m"))
        plt.gcf().autofmt_xdate()
        plt.show()
        
        # 2. Hedge Positions Over Time
        plt.figure(figsize=(15, 8))
        for index in self.cds_indexes:
            plt.plot(hedge_df.index, hedge_df[index]/self.notional, 
                    label=f'{index} (Notional in $10mm)')
        
        plt.title('CDS Hedge Positions Over Time')
        plt.ylabel('Notional Amount ($10mm units)')
        plt.xlabel('Date')
        plt.legend()
        plt.grid(True)
        plt.gca().xaxis.set_major_formatter(DateFormatter("%Y-%m"))
        plt.gcf().autofmt_xdate()
        plt.show()
        
        # 3. Buy/Sell Signals
        plt.figure(figsize=(15, 8))
        for index in self.cds_indexes:
            changes = hedge_df[index].diff()
            buy_dates = hedge_df.index[changes > 0]
            sell_dates = hedge_df.index[changes < 0]
            
            if len(buy_dates) > 0:
                plt.scatter(buy_dates, [1]*len(buy_dates), 
                          label=f'{index} Buy', marker='^', s=100)
            if len(sell_dates) > 0:
                plt.scatter(sell_dates, [-1]*len(sell_dates), 
                          label=f'{index} Sell', marker='v', s=100)
        
        plt.title('CDS Hedge Buy/Sell Signals')
        plt.yticks([-1, 0, 1], ['Sell', '', 'Buy'])
        plt.xlabel('Date')
        plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        plt.grid(True)
        plt.gca().xaxis.set_major_formatter(DateFormatter("%Y-%m"))
        plt.gcf().autofmt_xdate()
        plt.show()
        
        # 4. Hedge Holding Periods
        plt.figure(figsize=(15, 6))
        holding_periods = []
        current_holding = {index: 0 for index in self.cds_indexes}
        
        for i, row in hedge_df.iterrows():
            for index in self.cds_indexes:
                if row[index] > 0:
                    current_holding[index] += 1
                else:
                    if current_holding[index] > 0:
                        holding_periods.append(current_holding[index])
                    current_holding[index] = 0
        
        sns.histplot(holding_periods, bins=20, kde=True)
        plt.axvline(self.max_holding_period, color='r', linestyle='--', 
                   label='Max Holding Period')
        plt.title('Distribution of Hedge Holding Periods (Days)')
        plt.xlabel('Holding Period (Days)')
        plt.ylabel('Frequency')
        plt.legend()
        plt.show()
        
        # Print summary statistics
        print("\n=== Hedge Position Summary ===")
        for index in self.cds_indexes:
            active_days = (hedge_df[index] > 0).sum()
            avg_notional = hedge_df[index][hedge_df[index] > 0].mean() / self.notional
            print(f"{index}: Active {active_days} days, Avg notional: {avg_notional:.1f} $10mm units")
        
        print("\n=== Performance Summary ===")
        total_pnl = results['Portfolio_Value'].iloc[-1] - self.initial_capital
        total_hedge_cost = results['Hedge_Costs'].sum()
        print(f"Total PnL: ${total_pnl:,.2f}")
        print(f"Total Hedge Costs: ${total_hedge_cost:,.2f}")
        print(f"Net PnL: ${total_pnl - total_hedge_cost:,.2f}")

# Example usage
if __name__ == "__main__":
    # Load sample data (replace with your actual data)
    data = {
        'Date': pd.date_range(start='1/1/2007', periods=100),
        'PnL': np.random.normal(0, 10000, 100).cumsum(),
        'ITRX Main': np.random.uniform(50, 150, 100),
        'ITRX XOVER': np.random.uniform(200, 400, 100),
        'CDX IG': np.random.uniform(50, 150, 100),
        'CDX HY': np.random.uniform(300, 500, 100),
        'CDX EM': np.random.uniform(200, 400, 100)
    }
    pnl_data = pd.DataFrame(data)[['Date', 'PnL']]
    cds_data = pd.DataFrame(data).drop('PnL', axis=1)
    
    # Initialize and run optimizer
    optimizer = CDSHedgingOptimizer(pnl_data, cds_data)
    processed_data = optimizer.preprocess_data()
    optimizer.train_models(processed_data)
    results = optimizer.run_backtest(processed_data)
    
    # Generate visualizations
    optimizer.visualize_results(results)