In [1]:
import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

class MarketSignalAnalyzer:
    """
    A class to analyze financial market signals based on combined 
    momentum and mean reversion strategies.
    """
    
    def __init__(self):
        self.results = {}
        
    def calculate_rsi(self, data, window=14):
        """Calculate the Relative Strength Index."""
        delta = data.diff()
        gain = delta.where(delta > 0, 0).rolling(window=window).mean()
        loss = -delta.where(delta < 0, 0).rolling(window=window).mean()
        
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        return rsi
    
    def calculate_bollinger_bands(self, data, window=20, num_std=2):
        """Calculate Bollinger Bands."""
        rolling_mean = data.rolling(window=window).mean()
        rolling_std = data.rolling(window=window).std()
        
        upper_band = rolling_mean + (rolling_std * num_std)
        lower_band = rolling_mean - (rolling_std * num_std)
        
        return rolling_mean, upper_band, lower_band
    
    def calculate_atr(self, high, low, close, window=14):
        """Calculate Average True Range."""
        tr1 = high - low
        tr2 = abs(high - close.shift())
        tr3 = abs(low - close.shift())
        
        # Calculate maximum of the three values at each index position
        tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
        atr = tr.rolling(window=window).mean()
        
        return atr
    
    def generate_signal(self, data, rsi_window=14, bb_window=20, 
                       atr_window=14, rsi_upper=70, rsi_lower=30, 
                       atr_threshold=1.5):
        """
        Generate trading signals based on RSI, Bollinger Bands, and ATR.
        
        Signal Rules:
        1. Buy when RSI < rsi_lower AND price < lower BB AND ATR > atr_threshold
        2. Sell when RSI > rsi_upper AND price > upper BB
        """
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['Close']
        
        # Calculate indicators
        signals['rsi'] = self.calculate_rsi(data['Close'], window=rsi_window)
        _, signals['upper_bb'], signals['lower_bb'] = self.calculate_bollinger_bands(
            data['Close'], window=bb_window
        )
        signals['atr'] = self.calculate_atr(
            data['High'], data['Low'], data['Close'], window=atr_window
        )
        signals['atr_pct'] = signals['atr'] / signals['price'] * 100
        
        # Generate signals
        signals['signal'] = 0
        
        # Buy signals (1)
        buy_condition = (
            (signals['rsi'] < rsi_lower) & 
            (signals['price'] < signals['lower_bb']) & 
            (signals['atr_pct'] > atr_threshold)
        )
        signals.loc[buy_condition, 'signal'] = 1
        
        # Sell signals (-1)
        sell_condition = (
            (signals['rsi'] > rsi_upper) & 
            (signals['price'] > signals['upper_bb'])
        )
        signals.loc[sell_condition, 'signal'] = -1
        
        # Create position column (1 for long, -1 for short, 0 for no position)
        signals['position'] = 0
        position = 0
        
        for i in range(len(signals)):
            if signals['signal'].iloc[i] == 1 and position != 1:
                position = 1  # Enter long position
            elif signals['signal'].iloc[i] == -1 and position != -1:
                position = -1  # Enter short position
            
            signals['position'].iloc[i] = position
        
        return signals
    
    def calculate_returns(self, signals):
        """Calculate returns based on positions."""
        # Daily returns of the price
        signals['daily_return'] = signals['price'].pct_change()
        
        # Strategy returns
        signals['strategy_return'] = signals['position'].shift(1) * signals['daily_return']
        
        # Cumulative returns
        signals['cumulative_return'] = (1 + signals['daily_return']).cumprod() - 1
        signals['strategy_cumulative_return'] = (1 + signals['strategy_return']).cumprod() - 1
        
        return signals
    
    def calculate_performance_metrics(self, signals):
        """Calculate performance metrics for the strategy."""
        # Filter out NaN values
        returns = signals['strategy_return'].dropna()
        
        if len(returns) == 0:
            return {
                'total_return': 0,
                'annualized_return': 0,
                'sharpe_ratio': 0,
                'max_drawdown': 0,
                'win_rate': 0,
                'profit_factor': 0
            }
        
        # Calculate trade statistics
        trades = signals[signals['signal'] != 0]['signal']
        num_trades = len(trades)
        
        # Calculate win rate if there are trades
        if num_trades > 0:
            # A trade is considered won if the next opposing signal results in a positive return
            wins = 0
            current_position = 0
            entry_price = 0
            
            for i in range(len(signals)):
                if signals['signal'].iloc[i] == 1 and current_position != 1:
                    current_position = 1
                    entry_price = signals['price'].iloc[i]
                elif signals['signal'].iloc[i] == -1 and current_position == 1:
                    if signals['price'].iloc[i] > entry_price:
                        wins += 1
                    current_position = -1
                    entry_price = signals['price'].iloc[i]
                elif signals['signal'].iloc[i] == 1 and current_position == -1:
                    if signals['price'].iloc[i] < entry_price:
                        wins += 1
                    current_position = 1
                    entry_price = signals['price'].iloc[i]
            
            win_rate = wins / max(1, num_trades - 1)  # Avoid division by zero
        else:
            win_rate = 0
        
        # Calculate total return
        total_return = signals['strategy_cumulative_return'].iloc[-1]
        
        # Calculate annualized return
        n_years = len(signals) / 252  # Assuming 252 trading days per year
        annualized_return = (1 + total_return) ** (1 / max(n_years, 0.01)) - 1
        
        # Calculate Sharpe ratio (assuming risk-free rate of 0)
        sharpe_ratio = (returns.mean() * 252) / (returns.std() * np.sqrt(252))
        
        # Calculate maximum drawdown
        cum_returns = (1 + signals['strategy_return']).cumprod()
        running_max = cum_returns.cummax()
        drawdown = (cum_returns / running_max) - 1
        max_drawdown = drawdown.min()
        
        # Calculate profit factor
        positive_returns = returns[returns > 0].sum()
        negative_returns = abs(returns[returns < 0].sum())
        profit_factor = positive_returns / max(negative_returns, 0.0001)  # Avoid division by zero
        
        # Return metrics as a dictionary
        return {
            'total_return': total_return,
            'annualized_return': annualized_return,
            'sharpe_ratio': sharpe_ratio,
            'max_drawdown': max_drawdown,
            'win_rate': win_rate,
            'profit_factor': profit_factor,
            'num_trades': num_trades
        }
    
    def download_data(self, ticker, start_date, end_date, interval='1d'):
        """Download historical data for a ticker."""
        data = yf.download(ticker, start=start_date, end=end_date, interval=interval)
        return data
    
    def analyze_instrument(self, ticker, name, start_date, end_date, interval='1d', 
                          rsi_window=14, bb_window=20, atr_window=14, 
                          rsi_upper=70, rsi_lower=30, atr_threshold=1.5):
        """Analyze a financial instrument and calculate performance metrics."""
        print(f"Analyzing {name} ({ticker}) with {interval} data...")
        
        # Download data
        data = self.download_data(ticker, start_date, end_date, interval)
        
        if len(data) == 0:
            print(f"No data available for {ticker} with interval {interval}")
            return None
        
        # Generate signals
        signals = self.generate_signal(
            data, 
            rsi_window=rsi_window, 
            bb_window=bb_window,
            atr_window=atr_window,
            rsi_upper=rsi_upper,
            rsi_lower=rsi_lower,
            atr_threshold=atr_threshold
        )
        
        # Calculate returns
        signals = self.calculate_returns(signals)
        
        # Calculate performance metrics
        metrics = self.calculate_performance_metrics(signals)
        
        # Store results
        self.results[name] = {
            'ticker': ticker,
            'interval': interval,
            'signals': signals,
            'metrics': metrics
        }
        
        return signals, metrics
    
    def plot_results(self, name):
        """Plot price, indicators, and returns for a specific instrument."""
        if name not in self.results:
            print(f"{name} not found in results.")
            return
        
        signals = self.results[name]['signals']
        
        # Create a figure with 3 subplots
        fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 16), sharex=True)
        
        # Plot price and Bollinger Bands
        ax1.plot(signals.index, signals['price'], label='Price')
        ax1.plot(signals.index, signals['upper_bb'], 'r--', label='Upper BB')
        ax1.plot(signals.index, signals['lower_bb'], 'g--', label='Lower BB')
        
        # Plot buy and sell signals
        ax1.plot(signals[signals['signal'] == 1].index, 
                signals['price'][signals['signal'] == 1], 
                '^', markersize=10, color='g', label='Buy Signal')
        ax1.plot(signals[signals['signal'] == -1].index, 
                signals['price'][signals['signal'] == -1], 
                'v', markersize=10, color='r', label='Sell Signal')
        
        ax1.set_title(f'{name} - Price and Signals')
        ax1.set_ylabel('Price')
        ax1.legend()
        ax1.grid(True)
        
        # Plot RSI
        ax2.plot(signals.index, signals['rsi'], label='RSI')
        ax2.axhline(y=70, color='r', linestyle='--', label='RSI Upper')
        ax2.axhline(y=30, color='g', linestyle='--', label='RSI Lower')
        ax2.set_title('RSI')
        ax2.set_ylabel('RSI')
        ax2.legend()
        ax2.grid(True)
        
        # Plot strategy cumulative returns vs buy and hold
        ax3.plot(signals.index, signals['strategy_cumulative_return'], 
                 label='Strategy Returns')
        ax3.plot(signals.index, signals['cumulative_return'], 
                 label='Buy and Hold Returns')
        ax3.set_title('Cumulative Returns')
        ax3.set_ylabel('Returns')
        ax3.legend()
        ax3.grid(True)
        
        plt.tight_layout()
        plt.savefig(f"{name.replace('/', '_')}_analysis.png")
        plt.close()

    def run_analysis(self):
        """Run analysis for multiple instruments."""
        end_date = datetime.now()
        start_date = end_date - timedelta(days=365*3)  # 3 years of data
        
        # Define instruments to analyze
        instruments = [
            {"ticker": "^GSPC", "name": "S&P 500", "interval": "1d"},
            {"ticker": "EURUSD=X", "name": "EUR/USD", "interval": "1h"},
            {"ticker": "^TNX", "name": "10-Year Treasury Yield", "interval": "1d"},
            {"ticker": "AAPL", "name": "Apple", "interval": "15m"},
            {"ticker": "GC=F", "name": "Gold Futures", "interval": "1mo"}
        ]
        
        # Different parameter sets for different markets
        parameters = {
            "S&P 500": {"rsi_window": 14, "bb_window": 20, "atr_window": 14, 
                      "rsi_upper": 70, "rsi_lower": 30, "atr_threshold": 1.0},
            "EUR/USD": {"rsi_window": 21, "bb_window": 20, "atr_window": 14, 
                       "rsi_upper": 75, "rsi_lower": 25, "atr_threshold": 0.5},
            "10-Year Treasury Yield": {"rsi_window": 10, "bb_window": 20, "atr_window": 10, 
                                    "rsi_upper": 75, "rsi_lower": 25, "atr_threshold": 2.0},
            "Apple": {"rsi_window": 9, "bb_window": 15, "atr_window": 10, 
                    "rsi_upper": 70, "rsi_lower": 30, "atr_threshold": 1.5},
            "Gold Futures": {"rsi_window": 14, "bb_window": 20, "atr_window": 14, 
                           "rsi_upper": 70, "rsi_lower": 30, "atr_threshold": 2.0}
        }
        
        # Analyze each instrument
        for instrument in instruments:
            # Adjust timeframe for intraday data
            if instrument["interval"] in ["15m", "1h"]:
                start_date_intraday = end_date - timedelta(days=60)  # 60 days for intraday data
                s_date = start_date_intraday
            else:
                s_date = start_date
            
            params = parameters[instrument["name"]]
            self.analyze_instrument(
                instrument["ticker"], 
                instrument["name"], 
                s_date, 
                end_date, 
                instrument["interval"],
                **params
            )
            
            # Plot results
            self.plot_results(instrument["name"])
        
        # Compare performance across instruments
        self.compare_performance()
    
    def compare_performance(self):
        """Compare performance metrics across instruments."""
        performance_df = pd.DataFrame()
        
        for name, result in self.results.items():
            metrics = result['metrics']
            performance_df = performance_df.append(
                {
                    'Instrument': name,
                    'Interval': result['interval'],
                    'Total Return': metrics['total_return'],
                    'Annualized Return': metrics['annualized_return'],
                    'Sharpe Ratio': metrics['sharpe_ratio'],
                    'Max Drawdown': metrics['max_drawdown'],
                    'Win Rate': metrics['win_rate'],
                    'Profit Factor': metrics['profit_factor'],
                    'Number of Trades': metrics['num_trades']
                },
                ignore_index=True
            )
        
        # Format display
        pd.set_option('display.float_format', '{:.2%}'.format)
        formatted_df = performance_df.copy()
        formatted_df['Total Return'] = performance_df['Total Return']
        formatted_df['Annualized Return'] = performance_df['Annualized Return']
        formatted_df['Sharpe Ratio'] = performance_df['Sharpe Ratio'].map('{:.2f}'.format)
        formatted_df['Max Drawdown'] = performance_df['Max Drawdown']
        formatted_df['Win Rate'] = performance_df['Win Rate']
        formatted_df['Profit Factor'] = performance_df['Profit Factor'].map('{:.2f}'.format)
        
        print("\nPerformance Comparison:")
        print(formatted_df)
        
        # Create a performance comparison bar chart
        metrics_to_plot = ['Total Return', 'Sharpe Ratio', 'Win Rate']
        
        fig, axes = plt.subplots(len(metrics_to_plot), 1, figsize=(10, 12))
        
        for i, metric in enumerate(metrics_to_plot):
            if metric == 'Sharpe Ratio':
                values = performance_df[metric].astype(float)
            else:
                values = performance_df[metric]
                
            axes[i].bar(performance_df['Instrument'], values)
            axes[i].set_title(f'Comparison of {metric}')
            axes[i].set_ylabel(metric)
            axes[i].set_xticklabels(performance_df['Instrument'], rotation=45)
            
        plt.tight_layout()
        plt.savefig("performance_comparison.png")
        plt.close()
        
        return formatted_df

# Run the analysis
if __name__ == "__main__":
    analyzer = MarketSignalAnalyzer()
    analyzer.run_analysis()

Analyzing S&P 500 (^GSPC) with 1d data...
YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed

Analyzing EUR/USD (EURUSD=X) with 1h data...



[*********************100%***********************]  1 of 1 completed

Analyzing 10-Year Treasury Yield (^TNX) with 1d data...



[*********************100%***********************]  1 of 1 completed

Analyzing Apple (AAPL) with 15m data...



[*********************100%***********************]  1 of 1 completed

Analyzing Gold Futures (GC=F) with 1mo data...






Performance Comparison:
               Instrument Interval  Total Return  Annualized Return  \
0                 S&P 500       1d        16.26%              5.18%   
1                 EUR/USD       1h        -0.25%             -0.06%   
2  10-Year Treasury Yield       1d        12.54%              4.03%   
3                   Apple      15m         0.72%              0.18%   
4            Gold Futures      1mo       -25.06%            -90.41%   

  Sharpe Ratio  Max Drawdown  Win Rate Profit Factor  Number of Trades  
0         0.39       -15.35%    10.71%          1.07                29  
1        -0.03        -3.23%     0.00%          0.99                13  
2         0.28       -40.72%    11.11%          1.05                37  
3         0.06       -12.56%     0.00%          1.01                43  
4        -6.23       -25.06%     0.00%          0.10                 6  
