In [1]:
# FOR CALCULATING MONTHLY HEDGE FUND PERFORMANCE METRICS

import pandas as pd
import numpy as np
import os
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

class PerformanceAnalyzer:
    def __init__(self, df, date_col='Date', return_col='MTD'):
        """        
        Parameters:
        df: DataFrame with date and return columns
        date_col: Name of the date column
        return_col: Name of the return column
        """
        self.df = df.copy()
        self.df[date_col] = pd.to_datetime(self.df[date_col])
        
        # Convert return column to numeric, handling percentages and other formats
        self.df[return_col] = self._convert_to_numeric(self.df[return_col])
        
        # Remove any rows with NaN in returns after conversion
        self.df = self.df.dropna(subset=[return_col])
        
        self.df = self.df.sort_values(date_col).reset_index(drop=True)
        self.date_col = date_col
        self.return_col = return_col
        
    def _convert_to_numeric(self, series):
        """
        Convert series to numeric, handling percentage signs and other common formats
        """
        # If already numeric, return as is
        if pd.api.types.is_numeric_dtype(series):
            return series
        
        # Convert to string and clean
        series = series.astype(str)
        
        # Remove percentage signs and other common characters
        series = series.str.replace('%', '', regex=False)
        series = series.str.replace(',', '', regex=False)
        series = series.str.replace('$', '', regex=False)
        series = series.str.replace(' ', '', regex=False)
        
        # Convert to numeric, coercing errors to NaN
        series = pd.to_numeric(series, errors='coerce')
        
        # If values are large (like 10 for 10%), assume they're percentages and divide by 100
        if series.abs().max() > 1:
            series = series / 100
            
        return series
    
    def get_period_data(self, years):
        """
        Get data for specified number of years back from latest date
        Returns exactly years * 12 most recent months
        """
        if years == 'since_inception':
            return self.df

        # Sort by date to ensure we have the most recent data
        sorted_df = self.df.sort_values(self.date_col, ascending=False)

        # Get exactly years * 12 most recent months
        n_months = years * 12
        if n_months > len(sorted_df):
            n_months = len(sorted_df)  # Don't exceed available data

        period_data = sorted_df.head(n_months)

        # Return in chronological order
        return period_data.sort_values(self.date_col, ascending=True)
    
    def calculate_cumulative_return(self, returns):
        """Calculate cumulative return"""
        return (1 + returns).prod() - 1
    
    def calculate_annualized_return(self, returns):
        """Calculate annualized return"""
        if len(returns) < 1:
            return 0
        cumulative_return = self.calculate_cumulative_return(returns)
        years = len(returns) / 12  # Assuming monthly data
        if years <= 0:
            return 0
        return (1 + cumulative_return) ** (1/years) - 1
    
    def calculate_annualized_volatility(self, returns):
        """Calculate annualized volatility"""
        if len(returns) < 2:
            return 0
        return returns.std() * np.sqrt(12)
    
    def calculate_upside_deviation(self, returns, mar=0):
        """Calculate upside deviation (semi-deviation above MAR)"""
        if len(returns) < 2:
            return 0
        excess_returns = returns - mar
        upside_returns = excess_returns[excess_returns > 0]
        if len(upside_returns) == 0:
            return 0
        return upside_returns.std() * np.sqrt(12)
    
    def calculate_downside_deviation(self, returns, mar=0):
        """Calculate downside deviation (semi-deviation below MAR)"""
        if len(returns) < 2:
            return 0
        excess_returns = returns - mar
        downside_returns = excess_returns[excess_returns < 0]
        if len(downside_returns) == 0:
            return 0
        return downside_returns.std() * np.sqrt(12)
    
    def calculate_max_drawdown(self, returns):
        """Calculate maximum drawdown"""
        if len(returns) < 1:
            return 0
        cumulative_returns = (1 + returns).cumprod()
        running_max = cumulative_returns.expanding().max()
        drawdowns = (cumulative_returns - running_max) / running_max
        return drawdowns.min()
    
    def calculate_sharpe_ratio(self, returns, risk_free_rate=0):
        """Calculate Sharpe ratio"""
        if len(returns) < 2:
            return 0
        excess_returns = returns - risk_free_rate/12
        annualized_excess_return = self.calculate_annualized_return(excess_returns)
        annualized_vol = self.calculate_annualized_volatility(returns)
        return annualized_excess_return / annualized_vol if annualized_vol != 0 else 0
    
    def calculate_sortino_ratio(self, returns, risk_free_rate=0, mar=0):
        """Calculate Sortino ratio"""
        if len(returns) < 2:
            return 0
        excess_returns = returns - risk_free_rate/12
        annualized_excess_return = self.calculate_annualized_return(excess_returns)
        downside_dev = self.calculate_downside_deviation(returns, mar)
        return annualized_excess_return / downside_dev if downside_dev != 0 else 0
    
    def calculate_calmar_ratio(self, returns):
        """Calculate Calmar ratio"""
        if len(returns) < 12:  # Need at least 1 year of data
            return 0
        annualized_return = self.calculate_annualized_return(returns)
        max_drawdown = abs(self.calculate_max_drawdown(returns))
        return annualized_return / max_drawdown if max_drawdown != 0 else 0
    
    def calculate_win_rate(self, returns):
        """Calculate win rate (percentage of positive returns)"""
        if len(returns) == 0:
            return 0
        winning_months = len(returns[returns > 0])
        return winning_months / len(returns)
    
    def calculate_average_return(self, returns):
        """Calculate average monthly return"""
        if len(returns) == 0:
            return 0
        return returns.mean()
    
    def calculate_average_gain(self, returns):
        """Calculate average gain (positive returns only)"""
        positive_returns = returns[returns > 0]
        return positive_returns.mean() if len(positive_returns) > 0 else 0
    
    def calculate_average_loss(self, returns):
        """Calculate average loss (negative returns only)"""
        negative_returns = returns[returns < 0]
        return negative_returns.mean() if len(negative_returns) > 0 else 0
    
    def analyze_period(self, period_name, risk_free_rate=0, mar=0):
        """
        Calculate all performance metrics for a given period
        """
        if period_name == '1_year':
            period_data = self.get_period_data(1)
        elif period_name == '3_year':
            period_data = self.get_period_data(3)
        elif period_name == '5_year':
            period_data = self.get_period_data(5)
        elif period_name == '10_year':
            period_data = self.get_period_data(10)
        else:  # since_inception
            period_data = self.get_period_data('since_inception')
        
        if len(period_data) < 2:
            print(f"Warning: Not enough data for {period_name} period")
            return None
            
        returns = period_data[self.return_col]
        
        # Debug info
        print(f"{period_name}: {len(returns)} months, returns range: [{returns.min():.4f}, {returns.max():.4f}]")
        
        results = {
            'Period': period_name.replace('_', ' ').title(),
            'Months': len(returns),
            'Cumulative Return': self.calculate_cumulative_return(returns),
            'Annualized Return': self.calculate_annualized_return(returns),
            'Annualized Volatility': self.calculate_annualized_volatility(returns),
            'Upside Deviation': self.calculate_upside_deviation(returns, mar),
            'Downside Deviation': self.calculate_downside_deviation(returns, mar),
            'Max Drawdown': self.calculate_max_drawdown(returns),
            'Sharpe Ratio': self.calculate_sharpe_ratio(returns, risk_free_rate),
            'Sortino Ratio': self.calculate_sortino_ratio(returns, risk_free_rate, mar),
            'Calmar Ratio': self.calculate_calmar_ratio(returns),
            'Win Rate': self.calculate_win_rate(returns),
            'Average Return': self.calculate_average_return(returns),
            'Average Gain': self.calculate_average_gain(returns),
            'Average Loss': self.calculate_average_loss(returns)
        }
        
        return results
    
    def generate_report(self, risk_free_rate=0, mar=0):
        """
        Generate comprehensive performance report for all periods
        """
        periods = ['1_year', '3_year', '5_year', '10_year', 'since_inception']
        report_data = []
        
        for period in periods:
            results = self.analyze_period(period, risk_free_rate, mar)
            if results:
                report_data.append(results)
        
        if not report_data:
            print("No data available for analysis")
            return pd.DataFrame(), pd.DataFrame()
        
        report_df = pd.DataFrame(report_data)
        
        # Format the report
        format_dict = {
            'Cumulative Return': '{:.2%}',
            'Annualized Return': '{:.2%}',
            'Annualized Volatility': '{:.2%}',
            'Upside Deviation': '{:.2%}',
            'Downside Deviation': '{:.2%}',
            'Max Drawdown': '{:.2%}',
            'Sharpe Ratio': '{:.2f}',
            'Sortino Ratio': '{:.2f}',
            'Calmar Ratio': '{:.2f}',
            'Win Rate': '{:.2%}',
            'Average Return': '{:.2%}',
            'Average Gain': '{:.2%}',
            'Average Loss': '{:.2%}'
        }
        
        styled_df = report_df.copy()
        for col, fmt in format_dict.items():
            if col in styled_df.columns:
                styled_df[col] = styled_df[col].apply(lambda x: fmt.format(x) if pd.notnull(x) else 'N/A')
        
        return report_df, styled_df

def get_file_path():
    """
    Helper function to get file path with proper handling
    """
    while True:
        file_path = input("Enter the path to your CSV file: ").strip().strip('"')
        
        # Replace backslashes with forward slashes for Mac compatibility
        file_path = file_path.replace('\\', '/')
        
        # Expand ~ to home directory
        file_path = os.path.expanduser(file_path)
        
        # Check if file exists
        if os.path.exists(file_path):
            return file_path
        else:
            print(f"File not found: {file_path}")
            print("Please check the path and try again.")
            print("Common issues:")
            print("- Use forward slashes (/) not backslashes (\\)")
            print("- For files on Desktop, try: ~/Desktop/armistice_mtd_returns.csv")
            print("- Or use the full path like: /Users/YourUsername/Desktop/armistice_mtd_returns.csv")
            print()

def inspect_data(df):
    """
    Inspect the data and provide information about columns and data types
    """
    print("\n" + "="*50)
    print("DATA INSPECTION")
    print("="*50)
    print(f"Total rows: {len(df)}")
    print(f"Columns: {df.columns.tolist()}")
    print("\nData types:")
    print(df.dtypes)
    print("\nFirst 5 rows:")
    print(df.head())
    print("\nLast 5 rows:")
    print(df.tail())
    
    # Check for any non-numeric values in potential return columns
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    non_numeric_cols = df.select_dtypes(exclude=[np.number]).columns
    
    print(f"\nNumeric columns: {list(numeric_cols)}")
    print(f"Non-numeric columns: {list(non_numeric_cols)}")
    
    for col in non_numeric_cols:
        if any(keyword in col.lower() for keyword in ['return', 'mtd', 'ret', 'performance']):
            print(f"\nSample values from '{col}':")
            print(df[col].head(10).tolist())

def main():
    """
    Main function to run the performance analysis
    """
    # Read CSV file
    try:
        file_path = get_file_path()
        df = pd.read_csv(file_path)
        print(f"Successfully loaded data with {len(df)} rows")
        print(f"Columns: {df.columns.tolist()}")
        
        # Inspect the data
        inspect_data(df)
        
    except Exception as e:
        print(f"Error reading file: {e}")
        return
    
    # Get column names
    date_col = input("\nEnter the name of the date column (press Enter for 'Date'): ").strip()
    if not date_col:
        date_col = 'Date'
    
    return_col = input("Enter the name of the return column (press Enter for 'MTD'): ").strip()
    if not return_col:
        return_col = 'MTD'
    
    # Verify columns exist
    if date_col not in df.columns:
        print(f"Error: Date column '{date_col}' not found in CSV")
        print(f"Available columns: {df.columns.tolist()}")
        return
        
    if return_col not in df.columns:
        print(f"Error: Return column '{return_col}' not found in CSV")
        print(f"Available columns: {df.columns.tolist()}")
        return
    
    # Get risk-free rate
    risk_free_input = input("Enter risk-free rate (annual, as percentage, e.g., 2.5 for 2.5%; press Enter for 0%): ").strip()
    risk_free_rate = float(risk_free_input) / 100 if risk_free_input else 0
    
    # Get MAR (Minimum Acceptable Return)
    mar_input = input("Enter Minimum Acceptable Return for Sortino (annual, as percentage, press Enter for 0%): ").strip()
    mar = float(mar_input) / 100 if mar_input else 0
    
    # Initialize analyzer
    analyzer = PerformanceAnalyzer(df, date_col, return_col)
    
    # Check if we have any data left after cleaning
    if len(analyzer.df) == 0:
        print("Error: No valid data remaining after cleaning. Please check your return column format.")
        return
    
    print(f"\nData after cleaning: {len(analyzer.df)} rows")
    print(f"Date range: {analyzer.df[date_col].min()} to {analyzer.df[date_col].max()}")
    print(f"Return statistics: min={analyzer.df[return_col].min():.4f}, max={analyzer.df[return_col].max():.4f}, mean={analyzer.df[return_col].mean():.4f}")
    
    # Generate report
    raw_results, formatted_results = analyzer.generate_report(risk_free_rate, mar)
    
    print("\n" + "="*80)
    print("PERFORMANCE METRICS REPORT")
    print("="*80)
    print(f"Risk-Free Rate: {risk_free_rate:.2%}")
    print(f"Minimum Acceptable Return: {mar:.2%}")
    print("="*80)
    
    if len(formatted_results) > 0:
        # Display formatted results
        pd.set_option('display.width', 1000)
        pd.set_option('display.max_columns', None)
        print(formatted_results.to_string(index=False))
        
        # Enhanced save option with multiple choices
        save_option = input("\nDo you want to save the results to a CSV file? (y/n): ").strip().lower()
        if save_option == 'y':
            print("\nSave options:")
            print("1. Save to current working directory")
            print("2. Save to same folder as input file")
            print("3. Specify custom path")
            
            choice = input("Choose option (1/2/3): ").strip()
            
            default_name = 'performance_report.csv'
            
            if choice == '1':
                # Option 1: Current working directory
                output_file = input(f"Enter filename (press Enter for '{default_name}'): ").strip()
                if not output_file:
                    output_file = default_name
                full_path = os.path.abspath(output_file)
                
            elif choice == '2':
                # Option 2: Same folder as input file
                input_dir = os.path.dirname(file_path)
                output_file = input(f"Enter filename (press Enter for '{default_name}'): ").strip()
                if not output_file:
                    output_file = default_name
                full_path = os.path.join(input_dir, output_file)
                
            elif choice == '3':
                # Option 3: Custom path
                custom_path = input("Enter full path for the file: ").strip()
                full_path = os.path.expanduser(custom_path)
                
            else:
                print("Invalid choice. Using current directory.")
                output_file = default_name
                full_path = os.path.abspath(output_file)
            
            # Show confirmation and save
            print(f"\nFile will be saved to: {full_path}")
            confirm = input("Is this OK? (y/n): ").strip().lower()
            if confirm == 'y':
                raw_results.to_csv(full_path, index=False)
                print(f"Results saved to {full_path}")
            else:
                print("Save cancelled")
    else:
        print("No results to display")

if __name__ == "__main__":
    main()

Enter the path to your CSV file:  /Users/kaellymac13/Desktop/armistice_mtd_returns.csv


Successfully loaded data with 133 rows
Columns: ['Date', 'MTD']

DATA INSPECTION
Total rows: 133
Columns: ['Date', 'MTD']

Data types:
Date    object
MTD     object
dtype: object

First 5 rows:
        Date     MTD
0  31-Jul-12   6.00%
1  31-Aug-12   6.10%
2  30-Sep-12   9.40%
3  31-Oct-12  -3.20%
4  30-Nov-12  -3.20%

Last 5 rows:
          Date     MTD
128  31-Mar-23   3.60%
129  30-Apr-23   4.10%
130  31-May-23  -4.40%
131  30-Jun-23  -1.30%
132  31-Jul-23  -1.20%

Numeric columns: []
Non-numeric columns: ['Date', 'MTD']

Sample values from 'MTD':
['6.00%', '6.10%', '9.40%', '-3.20%', '-3.20%', '6.70%', '5.00%', '0.50%', '3.40%', '5.00%']



Enter the name of the date column (press Enter for 'Date'):  
Enter the name of the return column (press Enter for 'MTD'):  
Enter risk-free rate (annual, as percentage, e.g., 2.5 for 2.5%; press Enter for 0%):  
Enter Minimum Acceptable Return for Sortino (annual, as percentage, press Enter for 0%):  



Data after cleaning: 133 rows
Date range: 2012-07-31 00:00:00 to 2023-07-31 00:00:00
Return statistics: min=-0.1440, max=0.2390, mean=0.0212
1_year: 12 months, returns range: [-0.0440, 0.0910]
3_year: 36 months, returns range: [-0.0510, 0.1600]
5_year: 60 months, returns range: [-0.1070, 0.2390]
10_year: 120 months, returns range: [-0.1440, 0.2390]
since_inception: 133 months, returns range: [-0.1440, 0.2390]

PERFORMANCE METRICS REPORT
Risk-Free Rate: 0.00%
Minimum Acceptable Return: 0.00%
         Period  Months Cumulative Return Annualized Return Annualized Volatility Upside Deviation Downside Deviation Max Drawdown Sharpe Ratio Sortino Ratio Calmar Ratio Win Rate Average Return Average Gain Average Loss
         1 Year      12            13.75%            13.75%                15.97%           10.01%              4.61%       -6.78%         0.86          2.98         2.03   50.00%          1.17%        5.08%       -2.73%
         3 Year      36            65.71%            18.34%  


Do you want to save the results to a CSV file? (y/n):  y



Save options:
1. Save to current working directory
2. Save to same folder as input file
3. Specify custom path


Choose option (1/2/3):  1
Enter filename (press Enter for 'performance_report.csv'):  



File will be saved to: /Users/kaellymac13/Desktop/4. PROFESSIONAL/3. Product Development/Code/performance_report.csv


Is this OK? (y/n):  y


Results saved to /Users/kaellymac13/Desktop/4. PROFESSIONAL/3. Product Development/Code/performance_report.csv
