# RSI momentum strategy for several pairs

- This is a breakout strategy using RSI indicator as the indicator for momentum
- We buy and sell based on RSI levels
- The rebalance is slow and designed for high fee environment
- This notebook does an initial grid search


# Parameters

- Strategy parameters define the fixed and grid searched parameters

In [16]:
import datetime

from tradeexecutor.strategy.default_routing_options import TradeRouting
from tradingstrategy.timebucket import TimeBucket
from tradeexecutor.strategy.cycle import CycleDuration


class Parameters:
    """Parameteres for this strategy.

    - Collect parameters used for this strategy here

    - Both live trading and backtesting parameters
    """

    id = "rsi-portfolio-v01"
    cycle_duration = CycleDuration.cycle_1d  # Run decide_trades() every 8h
    source_time_bucket = TimeBucket.d1  # Use 1h candles as the raw data
    clock_shift_bars = 0  # Do not do shifted candles
    upsample = None  # Do not upsample candles

    rsi_bars = 12  # Number of bars to calculate RSI for each tradingbar
    matic_eth_rsi_bars = 5  # Number of bars for the momentum factor
    rsi_entry = [50, 60, 70, 80]  # Single pair entry level - when RSI crosses above this value open a position
    rsi_exit = [55, 60, 65, 70]  # Single pair exit level - when RSI crosses below this value exit a position
    allocation = 0.98  # How much cash allocate for volatile positions
    rebalance_threshold = 0.10  # How much position mix % must change when we rebalance between two open positions
    initial_cash = 10_000  # Backtesting start cash
    trailing_stop_loss = 0.990000  # Trailing stop loss as 1 - x
    trailing_stop_loss_activation_level = 1.07  # How much above opening price we must be before starting to use trailing stop loss
    stop_loss = 0.80  # Hard stop loss when opening a new position

    #
    # Live trading only
    #
    routing = TradeRouting.default  # Pick default routes for trade execution
    required_history_period = rsi_bars * 2  # How much data a live trade execution needs to load to be able to calculate indicators

    #
    # Backtesting only
    #

    backtest_start = datetime.datetime(2022, 1, 1)
    backtest_end = datetime.datetime(2024, 3, 15)
    stop_loss_time_bucket = TimeBucket.h1  # use 1h close as the stop loss signal

# Trading pairs and market data

- Set up our trading pairs
- Load historical market data for backtesting
- We use Binance CEX data so we have longer history to backtest

In [17]:
import datetime
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.chain import ChainId
from tradeexecutor.utils.binance import create_binance_universe

# Randomly picked well-known token set with some survivorship bias
traded_pair_ids = [
    (ChainId.centralised_exchange, "binance", "ETH", "USDT"),
    (ChainId.centralised_exchange, "binance", "MATIC", "USDT"),
    (ChainId.centralised_exchange, "binance", "BTC", "USDT"),
    (ChainId.centralised_exchange, "binance", "SOL", "USDT"),
    (ChainId.centralised_exchange, "binance", "ADA", "USDT"),
    (ChainId.centralised_exchange, "binance", "XRP", "USDT"),
    (ChainId.centralised_exchange, "binance", "XTZ", "USDT"),
    (ChainId.centralised_exchange, "binance", "AVAX", "USDT"),
    (ChainId.centralised_exchange, "binance", "DOT", "USDT"),
    (ChainId.centralised_exchange, "binance", "CAKE", "USDT"),
    # (ChainId.centralised_exchange, "binance", "AEVO", "USDT"),
    (ChainId.centralised_exchange, "binance", "BNB", "USDT"),
]

strategy_universe = create_binance_universe(
    [f"{desc[2]}{desc[3]}" for desc in traded_pair_ids],
    candle_time_bucket=TimeBucket.d1,
    stop_loss_time_bucket=TimeBucket.h1,
    start_at=datetime.datetime(2018, 1, 1),
    end_at=datetime.datetime(2024, 3, 10),
    include_lending=False
)



  0%|          | 0/11 [00:00<?, ?it/s]

# Indicators

- Create the indicators we use in this strategy

In [18]:
import pandas_ta
import pandas as pd

from tradeexecutor.strategy.execution_context import ExecutionContext
from tradeexecutor.strategy.pandas_trader.indicator import IndicatorSet, IndicatorSource
from tradeexecutor.strategy.parameters import StrategyParameters
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse
from tradingstrategy.utils.groupeduniverse import resample_price_series


def calculate_shifted_rsi(pair_close_price_series: pd.Series, length: int, upsample: TimeBucket, shift: int):
    resampled_close = resample_price_series(pair_close_price_series, upsample.to_pandas_timedelta(), shift=shift)
    return pandas_ta.rsi(resampled_close, length=length)

def create_indicators(
    timestamp: datetime.datetime | None,
    parameters: StrategyParameters,
    strategy_universe: TradingStrategyUniverse,
    execution_context: ExecutionContext
):
    indicators = IndicatorSet()
    #indicators.add("rsi", calculate_shifted_rsi, {"length": parameters.rsi_bars, "upsample": parameters.upsample, "shift": parameters.shift})
    indicators.add("rsi", pandas_ta.rsi, {"length": parameters.rsi_bars})
    return indicators


# Trading algorithm

- Describe out trading strategy as code

In [19]:

from tradeexecutor.strategy.alpha_model import AlphaModel
from tradeexecutor.state.visualisation import PlotKind, PlotShape, PlotLabel
from tradeexecutor.strategy.weighting import weight_passthrouh
from tradeexecutor.state.trade import TradeExecution
from tradeexecutor.strategy.pandas_trader.strategy_input import StrategyInput


def decide_trades(
    input: StrategyInput,
) -> list[TradeExecution]:

    # Resolve our pair metadata for our two pair strategy
    parameters = input.parameters
    position_manager = input.get_position_manager()
    state = input.state
    timestamp = input.timestamp
    indicators = input.indicators
    shift = parameters.clock_shift_bars
    clock_shift = pd.Timedelta(hours=1) * shift

    # Resolve trading pairs from human descriptions to their ids
    our_pairs = [strategy_universe.get_pair_by_human_description(pair_id) for pair_id in traded_pair_ids]

    # Execute the daily trade cycle when the clock hour 0..24 is correct for our hourly shift
    if parameters.upsample:
        assert parameters.upsample.to_timedelta() >= parameters.cycle_duration.to_timedelta(), "Upsample period must be longer than cycle period"
        assert shift <= 0  # Shift -1 = do action 1 hour later

        # Do the clock shift trick
        if parameters.cycle_duration.to_timedelta() != parameters.upsample.to_timedelta():
            if (input.cycle - 1 + shift) % int(parameters.upsample.to_hours()) != 0:
                return []

    alpha_model = AlphaModel(input.timestamp)
    position_manager.log("decide_trades() start")

    #
    # Indicators
    #
    # Calculate indicators for each pair.
    #

    # Per-trading pair calcualted data
    current_rsi_values = {}  # RSI yesterday
    previous_rsi_values = {}  # RSI day before yesterday
    current_price = {}  # Close price yesterday

    for pair in our_pairs:
        current_price[pair] = indicators.get_price(pair)
        current_rsi_values[pair] = indicators.get_indicator_value("rsi", index=-1, pair=pair, clock_shift=clock_shift)
        previous_rsi_values[pair] = indicators.get_indicator_value("rsi", index=-2, pair=pair, clock_shift=clock_shift)

    #
    # Trading logic
    #

    for pair in our_pairs:
        existing_position = position_manager.get_current_position_for_pair(pair)
        pair_open = existing_position is not None
        signal_strength = current_rsi_values[pair]
        if pd.isna(signal_strength):
            signal_strength = 0
        alpha_model.set_signal(pair, 0)

        if pair_open:
            # We have existing open position for this pair,
            # keep it open by default unless we get a trigger condition below
            position_manager.log(f"Pair {pair} already open")
            alpha_model.set_signal(pair, signal_strength, stop_loss=parameters.stop_loss)

        if current_rsi_values[pair] and previous_rsi_values[pair]:

            # Check for RSI crossing our threshold values in this cycle, compared to the previous cycle
            rsi_cross_above = current_rsi_values[pair] >= parameters.rsi_entry and previous_rsi_values[pair] < parameters.rsi_entry
            rsi_cross_below = current_rsi_values[pair] < parameters.rsi_exit and previous_rsi_values[pair] > parameters.rsi_exit

            if not pair_open:
                # Check for opening a position if no position is open
                if rsi_cross_above:
                    position_manager.log(f"Pair {pair} crossed above")
                    alpha_model.set_signal(pair, signal_strength, stop_loss=parameters.stop_loss)
            else:
                # We have open position, check for the close condition
                if rsi_cross_below:
                    position_manager.log(f"Pair {pair} crossed below")
                    alpha_model.set_signal(pair, 0)

    # Enable trailing stop loss if we have reached the activation level
    if parameters.trailing_stop_loss_activation_level is not None:
       for p in state.portfolio.open_positions.values():
           if p.trailing_stop_loss_pct is None:
               if current_price[p.pair] >= p.get_opening_price() * parameters.trailing_stop_loss_activation_level:
                   p.trailing_stop_loss_pct = parameters.trailing_stop_loss

    # Use alpha model and construct a portfolio of two assets
    alpha_model.select_top_signals(2)
    alpha_model.assign_weights(weight_passthrouh)
    alpha_model.normalise_weights()
    alpha_model.update_old_weights(state.portfolio)
    portfolio = position_manager.get_current_portfolio()
    portfolio_target_value = portfolio.get_total_equity() * parameters.allocation
    alpha_model.calculate_target_positions(position_manager, portfolio_target_value)
    trades = alpha_model.generate_rebalance_trades_and_triggers(
        position_manager,
        min_trade_threshold=parameters.rebalance_threshold * portfolio.get_total_equity(),
    )

    #
    # Visualisations
    #

    if input.is_visualisation_enabled():

        visualisation = state.visualisation  # Helper class to visualise strategy output

        eth_pair = position_manager.get_trading_pair(traded_pair_ids[0])

        if current_rsi_values[eth_pair]:
            visualisation.plot_indicator(
                timestamp,
                f"RSI ETH",
                PlotKind.technical_indicator_detached,
                current_rsi_values[eth_pair],
                colour="orange",
            )


            # Low (vertical line)
            visualisation.plot_indicator(
                timestamp,
                f"RSI low trigger",
                PlotKind.technical_indicator_overlay_on_detached,
                parameters.rsi_exit,
                detached_overlay_name=f"RSI BTC",
                plot_shape=PlotShape.horizontal_vertical,
                colour="red",
                label=PlotLabel.hidden,
            )

            # High (vertical line)
            visualisation.plot_indicator(
                timestamp,
                f"RSI high trigger",
                PlotKind.technical_indicator_overlay_on_detached,
                parameters.rsi_enter,
                detached_overlay_name=f"RSI BTC",
                plot_shape=PlotShape.horizontal_vertical,
                colour="red",
                label=PlotLabel.hidden,
            )

        state.visualisation.add_calculations(timestamp, alpha_model.to_dict())  # Record alpha model thinking

    return trades

# Grid search

- Run the grid search

In [20]:
from tradeexecutor.backtest.grid_search import GridCombination, get_grid_search_result_path, perform_grid_search, prepare_grid_combinations

# This is the path where we keep the result files around
storage_folder = get_grid_search_result_path(Parameters.id)

# Popular grid search combinations and indicators for them
combinations = prepare_grid_combinations(
    Parameters,
    storage_folder,
    create_indicators=create_indicators,
    strategy_universe=strategy_universe,
)

indicators = GridCombination.get_all_indicators(combinations)

print(f"We prepared {len(combinations)} grid search combinations with total {len(indicators)} indicators which need to be calculated, stored in {storage_folder.resolve()}")

grid_search_results = perform_grid_search(
    decide_trades,
    strategy_universe,
    combinations,
    trading_strategy_engine_version="0.5",
    multiprocess=True,
)

We prepared 16 grid search combinations with total 11 indicators which need to be calculated, stored in /Users/moo/.cache/trading-strategy/grid-search/rsi-portfolio-v01
Using indicator cache /Users/moo/.cache/indicators/centralised-exchange_1d_11_2018-01-01-2024-03-10
No cached grid search results found from previous runs


Searching:   0%|          | 0/16 [00:00<?, ?it/s]

RuntimeError: Running a grid search combination failed:
<GridCombination #7
   allocation=0.98
   backtest_end=2024-03-15 00:00:00
   backtest_start=2022-01-01 00:00:00
   clock_shift_bars=0
   cycle_duration=CycleDuration.cycle_1d
   id=rsi-portfolio-v01
   initial_cash=10000
   matic_eth_rsi_bars=5
   rebalance_threshold=0.1
   required_history_period=24
   routing=TradeRouting.default
   rsi_bars=12
   rsi_entry=60
   rsi_exit=65
   source_time_bucket=TimeBucket.d1
   stop_loss=0.8
   stop_loss_time_bucket=TimeBucket.h1
   trailing_stop_loss=0.99
   trailing_stop_loss_activation_level=1.07
   upsample=None
>

# Analysing results

In [None]:
# Set Jupyter Notebook output mode parameters
from tradeexecutor.analysis.grid_search import find_best_grid_search_results
# from tradeexecutor.analysis.grid_search import visualise_table

# Print extension of our backtest
cached_results = [r for r in grid_search_results if r.cached]
print(f"Grid search results available: {len(grid_search_results)}, of which we got cached {len(cached_results)} in {storage_folder}")

best_result = find_best_grid_search_results(grid_search_results)

## Highest profitability

In [None]:
from tradeexecutor.analysis.grid_search import find_best_grid_search_results, render_grid_search_result_table

best_results = find_best_grid_search_results(grid_search_results)
render_grid_search_result_table(best_result.cagr)

## Highest Sharpe

In [None]:
render_grid_search_result_table(best_result.sharpe)