## Dependencies

In [1]:
import pandas as pd 
import numpy as np
from dotenv import load_dotenv
import os
import requests
from datetime import datetime, timedelta
import re
import pandas_ta as ta
import warnings
import plotly.io as pio
import plotly.graph_objs as go
from plotly.subplots import make_subplots

pio.renderers.default = 'browser'
warnings.filterwarnings('ignore')

load_dotenv()

## Fetching and Merging Raw Data

In [2]:
class DataManager:
    
    def __init__(self):
        self.api_key = os.getenv('GLASSNODE_API_KEY')

    @staticmethod
    def datetime_to_unix(date):
        """Convert a datetime object to a Unix timestamp."""
        return int(date.timestamp())

    def _fetch_glassnode_data(self, endpoint, start, end, frequency):
        params = {
            'a': 'BTC',
            's': self.datetime_to_unix(start),
            'u': self.datetime_to_unix(end),
            'i': frequency,
            'f': 'JSON',
            'api_key': self.api_key  
        }
        
        base_url = 'https://api.glassnode.com/v1/metrics'
        url = f"{base_url}/{endpoint}"
        name = re.search(r'/([^/]*)$', endpoint).group(1)

        response = requests.get(url, params=params)
        
        if response.status_code == 200:
            data = response.json()
            df = pd.DataFrame(data)
            if 't' in df.columns:
                df['t'] = pd.to_datetime(df['t'], unit='s')
            df.rename(columns={'v': name}, inplace=True)
            return df
        else:
            raise Exception(f"Failed to fetch data: {response.status_code} - {response.text}")
    
    def _fetch_and_merge_glassnode_data(self, endpoints, start, end, frequency):
        data_frames = []

        for endpoint in endpoints:
            df = self._fetch_glassnode_data(endpoint, start, end, frequency)
            if df is not None and not df.empty:
                data_frames.append(df.set_index('t'))
            else:
                print(f"Failed to fetch data for endpoint: {endpoint}")

        if data_frames:
            merged_df = pd.concat(data_frames, axis=1, join='outer')
            merged_df.reset_index(inplace=True)
            merged_df.set_index('t', inplace=True)
            return merged_df
        else:
            return None
        
        
    
    def get_trigger_data(self, start, end, frequency='1h'):
        # Fetches data for the trigger points
        short_term_endpoints = [
            os.getenv('BTC_PRICE'),
            os.getenv('SSR'),
            os.getenv('SUPPLY_IN_PROFIT')
        ]
        return self._fetch_and_merge_glassnode_data(short_term_endpoints, start, end, frequency)

    def get_context_data(self, start, end, frequency='24h'):
        # Fetches data for context determination
        contextual_endpoints = [
            os.getenv('BTC_PRICE'),
            os.getenv('BTC_REALIZED_PRICE'),
            os.getenv('PUELL_MULTIPLE'),
            os.getenv('MVRV_Z_SCORE'),
            os.getenv('ENTITY_ADJ_NUPL'),
            os.getenv('ENTITY_ADJ_DORMANCY_FLOW'),
            os.getenv('SUPPLY_IN_PROFIT')
        ]
        return self._fetch_and_merge_glassnode_data(contextual_endpoints, start, end, frequency)
    
    


In [3]:
class TradingStrategy:

    def __init__(self, data_manager):
        self.data_manager = data_manager

    def compute_triggers(self, start, end):
        
        start = datetime.strptime(start, '%Y-%m-%d')
        end = datetime.strptime(end, '%Y-%m-%d')
        
        trigger_data = self.data_manager.get_trigger_data((start - timedelta(hours=8640)), end)
        
        trigger_data['rsi_ssr_smoothed'] = ta.ema(ta.rsi(trigger_data['ssr_oscillator'], length=336), length=800)
        trigger_data['rsi_ssr_smoothed_median'] = trigger_data['rsi_ssr_smoothed'].rolling(window=240).median()
        trigger_data['pct_profit_sma30'] = trigger_data['profit_relative'].rolling(30 * 24).mean()
        trigger_data['acc_dacc_pct_profit'] = trigger_data['pct_profit_sma30'].pct_change(30 * 24)
        #trigger_data['pct_profit_mean'] = trigger_data['profit_relative'].expanding().mean()
        #trigger_data['pct_profit_sma360'] = trigger_data['profit_relative'].rolling(360 * 24).mean()

        # Signal calculation
        trigger_data['ssr_buy_signal'] = np.where(trigger_data['rsi_ssr_smoothed_median'] < trigger_data['rsi_ssr_smoothed'], 1, 0)
        trigger_data['ssr_sell_signal'] = np.where(trigger_data['rsi_ssr_smoothed_median'] > trigger_data['rsi_ssr_smoothed'], 1, 0)
        trigger_data['profit_buy_signal'] = np.where((trigger_data['acc_dacc_pct_profit'].diff(24*7) > 0), 1, 0)
        trigger_data['profit_sell_signal'] = np.where((trigger_data['acc_dacc_pct_profit'].diff(24*7) <= 0), 1, 0)
        
        trigger_data = trigger_data[trigger_data.index >= start]
        
        return trigger_data

    def compute_context(self, start, end):

        start = datetime.strptime(start, '%Y-%m-%d')
        end = datetime.strptime(end, '%Y-%m-%d')
        
        context_data = self.data_manager.get_context_data((start - timedelta(days=360)), end)

        # Market condition calculations
        context_data['28d_mkt_gradient'] = (context_data['price_usd_close'].diff(28) - context_data['price_realized_usd'].diff(28) - 
                                            (context_data['price_usd_close'].diff(28) - context_data['price_realized_usd'].diff(28)).expanding().mean()) / \
                                            (context_data['price_usd_close'].diff(28) - context_data['price_realized_usd'].diff(28)).expanding().std()
        
        
        context_data['mayer_multiple'] = context_data['price_usd_close'] / context_data['price_usd_close'].rolling(200).mean()
        context_data['price_profit_corr'] = context_data['price_usd_close'].rolling(7).corr(context_data['profit_relative'])

    

        # Detect market tops and bottoms
        context_data['top_detection'] = (np.where(context_data['mvrv_z_score'] > 3.8, 1, 0) * 
                                         np.where(context_data['mayer_multiple'] >= 1.3, 1, 0) *
                                         np.where(context_data['net_unrealized_profit_loss_account_based'] >= 0.6, 1, 0) *
                                         np.where(context_data['28d_mkt_gradient'] >= 7, 1, 0)) * context_data['price_usd_close']
        
        context_data['bottom_detection'] = (np.where(context_data['mvrv_z_score'] <= 0, 1, 0) * 
                                            np.where(context_data['mayer_multiple'] <= 0.8, 1, 0) *
                                            np.where(context_data['price_usd_close'] <= context_data['price_realized_usd'], 1, 0) *
                                            np.where(context_data['net_unrealized_profit_loss_account_based'] <= 0, 1, 0) *
                                            np.where(context_data['puell_multiple'] <= 0.5, 1, 0) *
                                            np.where(context_data['dormancy_flow'] <= 200000, 1, 0)) * context_data['price_usd_close']
        
        
       

        # Apply context determination
        context_data['context'] = context_data.apply(self.determine_context, axis=1)
        context_data['context'].fillna(method='ffill', inplace=True)
        context_data = context_data[context_data.index >= start]
        
        return context_data

    @staticmethod
    def determine_context(row):
        if row['bottom_detection'] != 0 and (row['top_detection'] == 0 or row.name < row.index[row['top_detection'] != 0].max()):
            return 'bull'
        elif row['top_detection'] != 0 and (row['bottom_detection'] == 0 or row.name < row.index[row['bottom_detection'] != 0].max()):
            return 'bear'
        else:
            return np.nan
        
    def generate_signals(self, trigger_data, context_data):
        
        full_data=pd.merge_asof(trigger_data, context_data[['context', 'top_detection', 'bottom_detection']], on='t', direction='forward')
        full_data['top_detection'].fillna(method='ffill', inplace=True)
        full_data['bottom_detection'].fillna(method='ffill', inplace=True)
        full_data['context'] = full_data['context'].ffill()
        full_data

        # Initialize the action column
        full_data['action'] = 'cash'  # Default action is to cash
        full_data.loc[(full_data['context'] == 'bull') & ((full_data['ssr_buy_signal'] == 1)), 'action'] = 'long'
        full_data.loc[(full_data['context'] == 'bull') & ((full_data['ssr_sell_signal'] == 1)), 'action'] = 'cash' 
        #full_data.loc[(full_data['context'] == 'bull') & ((full_data['ssr_sell_signal'] == 1) & (full_data['profit_sell_signal'] == 1)), 'action'] = 'short'
        full_data.loc[(full_data['context'] == 'bear') & ((full_data['ssr_sell_signal'] == 1)), 'action'] = 'short'
        

        full_data.set_index('t', inplace=True)

        return full_data




In [4]:
class Backtester:
    
    def __init__(self, data, initial_capital=10000):
        self.data = data
        self.initial_capital = initial_capital
        self.results = {}
        self.run_backtest()

    def run_backtest(self):
        self.data['position'] = self.data['action'].map({'long': 1, 'short': -1, 'cash': 0}).fillna(0)
        self.data['market_return'] = self.data['price_usd_close'].pct_change()
        self.data['market_portfolio_cumulative_return'] = (1+self.data['market_return']).cumprod() * self.initial_capital
        self.data['strategy_return'] = self.data['position'].shift(1) * self.data['market_return']
        self.data['strategy_cumulative_return'] = (1 + self.data['strategy_return']).cumprod() * self.initial_capital
        self.data['strategy_cumulative_return'].fillna(self.initial_capital, inplace=True)
        self.calculate_statistics()

    def calculate_statistics(self):
        self.results['Start'] = self.data.index.min()
        self.results['End'] = self.data.index.max()
        self.results['Duração'] = self.results['End'] - self.results['Start']
        self.results['Tempo de Exposição [%]'] = np.mean(self.data['position'] != 0) * 100
        self.results['Saldo Final [$]'] = self.data['strategy_cumulative_return'].iloc[-1]
        self.results['Pico de Valor [$]'] = self.data['strategy_cumulative_return'].max()
        self.results['Retorno AlfaTrader [%]'] = ((self.results['Saldo Final [$]'] / self.initial_capital - 1) * 100)
        self.results['Retorno Buy & Hold [%]'] = ((self.data['price_usd_close'].iloc[-1] / self.data['price_usd_close'].iloc[0] - 1) * 100)
        self.results['Retorno (Ann.) [%]'] = self.results['Retorno AlfaTrader [%]'] / (self.results['Duração'].days / 365.25)
        self.results['Volatilidade (Ann.) [%]'] = self.data['strategy_return'].std() * np.sqrt(365*24) * 100

        # Filter to include only non-zero returns or when the strategy is active
        active_returns = self.data['strategy_return'][self.data['strategy_return'] != 0]

        # Calculate Sharpe Ratio using non-zero returns
        if active_returns.empty:
            self.results['Sharpe Ratio'] = 0  # Handle case where there are no active returns
        else:
            self.results['Sharpe Ratio'] = (active_returns.mean() / self.data['strategy_return'].std()) * np.sqrt(8760)

        # Calculate negative returns for the downside deviation
        negative_returns = self.data['strategy_return'][self.data['strategy_return'] < 0]

        # Calculate Sortino Ratio using non-zero returns and downside deviation
        if not negative_returns.empty and not active_returns.empty:
            mean_active_returns = active_returns.mean()
            downside_deviation = negative_returns.std()
            annual_factor = np.sqrt(8760)  # Assuming 24/7 trading for crypto markets
            self.results['Sortino Ratio'] = (mean_active_returns / downside_deviation) * annual_factor
        else:
            self.results['Sortino Ratio'] = 0  # Handle case where there are no negative returns or active returns

        # Drawdown calculations
        drawdown = self.data['strategy_cumulative_return'].cummax() - self.data['strategy_cumulative_return']
        drawdown_pct = drawdown / self.data['strategy_cumulative_return'].cummax() * 100
        
        #self.results['Max. Drawdown [%]'] = drawdown_pct.min()
        #self.results['Drawdown Médio [%]'] = drawdown_pct.mean()

        #self.results['Max. Drawdown Duration'] = drawdown[drawdown == 0].astype(int).groupby((drawdown != 0).cumsum()).cumcount().max()
        #self.results['Avg. Drawdown Duration'] = drawdown[drawdown == 0].astype(int).groupby((drawdown != 0).cumsum()).cumcount().mean()

        # Trading statistics
        trades = self.data['position'].diff().fillna(0) != 0
        self.results['# Trades'] = trades.sum()
        #winning_trades = self.data['strategy_return'][trades] > 0
        #self.results['Win Rate [%]'] = winning_trades.mean() * 100
        #self.results['Best Trade [%]'] = self.data['strategy_return'][winning_trades].max() * 100
        #self.results['Worst Trade [%]'] = self.data['strategy_return'][trades].min() * 100
        #self.results['Avg. Trade [%]'] = self.data['strategy_return'][trades].mean() * 100
        #self.results['Max. Trade Duration'] = trades.groupby((trades == 0).cumsum()).cumcount().max()
        #self.results['Avg. Trade Duration'] = trades.groupby((trades == 0).cumsum()).cumcount().mean()

    def get_results(self):
        return pd.DataFrame([self.results])

In [5]:
class StrategyVisualizer:
    def __init__(self, data):
        self.data = data

    def plot_results(self):
        # Create subplots: one for portfolio values and one for BTC price with signals
        fig = make_subplots(rows=2, cols=1, shared_xaxes=True, 
                            vertical_spacing=0.1, subplot_titles=('Valor do Portfólio', 'Preço BTC com Sinais'))

        # Plotting portfolio values: Strategy vs Buy & Hold
        fig.add_trace(go.Scatter(x=self.data.index, y=self.data['strategy_cumulative_return'], name='AlfaTrader ®', line=dict(color='green')), row=1, col=1)
        #fig.add_trace(go.Scatter(x=self.data.index, y=self.data['price_usd_close'], name='BTC', line=dict(color='blue')), row=1, col=1)
        fig.add_trace(go.Scatter(x=self.data.index, y=self.data['market_portfolio_cumulative_return'], name='Comprar e Segurar', line=dict(color='blue')), row=1, col=1)
        # BTC Price and action markers
        fig.add_trace(go.Scatter(x=self.data.index, y=self.data['price_usd_close'], name='Preço BTC', line=dict(color='black')), row=2, col=1)

        # Adding markers only at points of action change
        action_changes = self.data[self.data['action'] != self.data['action'].shift(1)]
        buys = action_changes[action_changes['action'] == 'long']
        cash = action_changes[action_changes['action'] == 'cash']
        shorts = action_changes[action_changes['action'] == 'short']

        fig.add_trace(go.Scatter(x=buys.index, y=buys['price_usd_close'], mode='markers', marker=dict(color='#4AD811', size=8, symbol='triangle-up'), name='Compra'), row=2, col=1)
        fig.add_trace(go.Scatter(x=cash.index, y=cash['price_usd_close'], mode='markers', marker=dict(color='#FFCC2D', size=8, symbol='square'), name='Caixa'), row=2, col=1)
        fig.add_trace(go.Scatter(x=shorts.index, y=shorts['price_usd_close'], mode='markers', marker=dict(color='#C70039', size=8, symbol='triangle-down'), name='Venda'), row=2, col=1)

        # Update layout
        fig.update_layout(height=800, width=1000, title_text="", hovermode="x unified", plot_bgcolor='white', paper_bgcolor='white', font=dict(color='black'))
        fig.show()



    def plot_btc_with_signals(self):
        # Create a figure with custom subplots
        fig = make_subplots(specs=[[{"secondary_y": True}]])

        # Plotting BTC price as a line graph
        fig.add_trace(go.Scatter(x=self.data.index, y=self.data['price_usd_close'], name='Preço BTC', line=dict(color='blue')), secondary_y=False)

        # Ensure that only points where detection is marked as '1' have non-zero heights
        # Top detections as green bars
        top_heights = self.data['price_usd_close'].where(self.data['top_detection'] != 0, 0)
        fig.add_trace(go.Bar(x=self.data.index, y=top_heights, name='Detecção de Topo', marker_color='green'), secondary_y=False)

        # Bottom detections as red bars
        bottom_heights = self.data['price_usd_close'].where(self.data['bottom_detection'] != 0, 0)
        fig.add_trace(go.Bar(x=self.data.index, y=bottom_heights, name='Detecção de Fundo', marker_color='red'), secondary_y=False)

        # Update layout for aesthetics and readability
        fig.update_layout(
            title='Preço BTC com Indicadores de Topo e Fundo',
            xaxis_title='',
            yaxis_title='Preço BTC',
            legend_title='Legenda',
            hovermode='x unified',
            plot_bgcolor='white',
            paper_bgcolor='white',
            font=dict(color='black')
        )

        # Adjust the y-axes visual appearance
        fig.update_yaxes(title_text="Preço BTC", secondary_y=False)

        fig.show()


In [12]:
# USAGE
data_manager=DataManager()
strategy = TradingStrategy(data_manager=data_manager)
context_data = strategy.compute_context('2011-08-1', '2024-06-19')
trigger_data = strategy.compute_triggers('2020-12-1', '2024-6-19')
final_data = strategy.generate_signals(trigger_data, context_data)


In [13]:
backtester = Backtester(final_data)
results = backtester.get_results().T
results

Unnamed: 0,0
Start,2020-12-01 00:00:00
End,2024-06-19 02:00:00
Duração,1296 days 02:00:00
Tempo de Exposição [%],50.13341
Saldo Final [$],41578.142407
Pico de Valor [$],45003.395912
Retorno AlfaTrader [%],315.781424
Retorno Buy & Hold [%],234.947533
Retorno (Ann.) [%],88.996269
Volatilidade (Ann.) [%],47.533064


In [14]:
visualizer = StrategyVisualizer(final_data)
#visualizer.plot_btc_with_signals()
visualizer.plot_results()
