# Liquidity risk example

An example strategy backtest against available liquidity data.

- Based on `portfolio-construction` and `alternative-data` examples
    - Trades using portfolio construction and alpha model
- Run a [portfolio construction strategy](https://tradingstrategy.ai/blog/writing-portfolio-construction-strategy-in-python) based on the signal
    - In a real strategy this signal needs to be combined with other signals.
- Checks that a trading pair exceeds a liquidity threshold so it can be traded,
  based on the available historical liquidity at the point of time


# Prerequisites

To run this backtest, you first need to run `scripts/prefilter-polygon.py` to build a Polygon dataset (> 1 GB) for this backtest based on more than 10 GB downloaded data.

In [None]:
from pathlib import Path

# See scripts/prefilter-polygon.py
liquidity_output_fname = Path("/tmp/polygon-liquidity-prefiltered.parquet")
price_output_fname = Path("/tmp/polygon-price-prefiltered.parquet")

assert price_output_fname.exists(), "Run prefilter script first"
assert liquidity_output_fname.exists(), "Run prefilter script first"


# Set up

Set up Trading Strategy data client.


In [None]:
from tradeexecutor.utils.notebook import setup_charting_and_output
from tradingstrategy.client import Client
from tradeexecutor.utils.notebook import setup_charting_and_output, OutputMode

client = Client.create_jupyter_client()

# Set up drawing charts in interactive vector output mode.
# This is slower. See the alternative commented option below.
# Kernel restart needed if you change output mode.
# setup_charting_and_output(OutputMode.interactive)

# Set up rendering static PNG images.
# This is much faster but disables zoom on any chart.
setup_charting_and_output(OutputMode.static, image_format="png", width=1500, height=1000)


# Parameters

- Strategy parameters define the fixed and grid searched parameters

In [None]:
from tradingstrategy.chain import ChainId
import datetime

from tradeexecutor.strategy.default_routing_options import TradeRouting
from tradingstrategy.timebucket import TimeBucket
from tradeexecutor.strategy.cycle import CycleDuration
from tradeexecutor.strategy.parameters import StrategyParameters


class Parameters:
    """Parameteres for this strategy.

    - Collect parameters used for this strategy here

    - Both live trading and backtesting parameters
    """

    id = "portfolio-construction" # Used in cache paths

    cycle_duration = CycleDuration.d1  # Daily rebalance
    candle_time_bucket = TimeBucket.d1  
    allocation = 0.98   
    
    max_assets = 4  # How many assets hold in portfolio once
    rsi_length = 7   # Use 7 days RSI as the alpha signal for a trading pair
    minimum_rebalance_trade_percent = 0.25  # Portfolio must change at least 1/4 before we start doing rebalance trades

    #
    # Filtering and data quality
    #
    min_liquidity = 100_000  # Do not trade pairs below this amount of USD locked in TVL
    min_price = 0.000001  # Filter out trading pairs with bad price units
    max_price = 1_000_000  # Filter out trading pairs with bad price units

    #
    # Live trading only
    #
    chain_id = ChainId.polygon
    routing = TradeRouting.default  # Pick default routes for trade execution
    required_history_period = datetime.timedelta(days=rsi_length + 1)

    #
    # Backtesting only
    #
    backtest_start = datetime.datetime(2023, 1, 1)
    backtest_end = datetime.datetime(2024, 1, 1)
    initial_cash = 10_000
    stop_loss_time_bucket = TimeBucket.h1  # Must match scripts/prefilter-polygon.py


parameters = StrategyParameters.from_class(Parameters)  # Convert to AttributedDict to easier typing with dot notation


# Custom data

Load the custom data from a CSV file.

- Load using Pandas
- This data will be split and mapped to per-pair indicators later on, as the data format is per-pair

*Note*: Relative paths work different in different notebook run-time environments. Below is for Visual Studio Code.

In [None]:
import os
import pandas as pd

CSV_PATH = os.environ.get("CSV_PATH", "../../data/df_trend_polygon.csv")  # Allow override location for command line ipython

custom_data_df = pd.read_csv(CSV_PATH)  # For the repo, we keep a partial sample of the data
custom_data_df.index = pd.DatetimeIndex(custom_data_df["date"])
custom_data_df["contract_address"] = custom_data_df["contract_address"].str.lower()  # Framework operates with lowercased addresses everywehre

start = custom_data_df.index[0]
end = custom_data_df.index[-1]

csv_token_list = list(custom_data_df.contract_address.unique())
print(f"CSV contains data for {len(csv_token_list)} tokens, time range {start} - {end}")

# Create per-pair DataFrame group by
custom_data_group = custom_data_df.groupby("contract_address")


# Trading pairs and market data

- Read preprocessed price OHLCV and liquidity dataframes in memory
- Get a list of ERC-20 tokens we are going to trade on Polygon
- Create an union where we have 1) our tokens from CSV and then 2) trading pairs with polygon data
- Apply filter prefiltering from `Parameters.min_liquidity`

In [None]:
from tradingstrategy.universe import Universe
from tradingstrategy.liquidity import GroupedLiquidityUniverse
from tradeexecutor.strategy.pandas_trader.alternative_market_data import resample_multi_pair
from tradingstrategy.candle import GroupedCandleUniverse
from tradingstrategy.pair import filter_for_base_tokens, PandasPairUniverse, StablecoinFilteringMode, \
    filter_for_stablecoins
from tradingstrategy.client import Client

from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse, translate_token
from tradeexecutor.strategy.execution_context import ExecutionContext, notebook_execution_context
from tradeexecutor.strategy.universe_model import UniverseOptions

WMATIC = "0x0d500b1d8e8ef31e21c99d1db9a6444d3adf1270"
USDCE = "0x2791bca1f2de4661ed88a30c99a7a9449aa84174"

# We care only Quickswap and Uniswap v3 pairs
SUPPORTED_DEXES = {
    "quickswap",
    "uniswap-v3"
}

# Get the token list of everything in the CSV + hardcoded WMATIC
custom_data_token_set = {WMATIC} | set(csv_token_list)

def create_trading_universe(
    timestamp: datetime.datetime | None,
    client: Client,
    execution_context: ExecutionContext,
    universe_options: UniverseOptions,
) -> TradingStrategyUniverse:
    """Create the trading universe."""

    start_at = pd.Timestamp(universe_options.start_at)
    end_at = pd.Timestamp(universe_options.end_at)

    print(f"Backtesting {start_at} - {end_at}")

    chain_id = Parameters.chain_id

    exchange_universe = client.fetch_exchange_universe()
    exchange_universe = exchange_universe.limit_to_chains({Parameters.chain_id}).limit_to_slugs(SUPPORTED_DEXES)
    print(f"We support {exchange_universe.get_exchange_count()} DEXes")

    pairs_df = client.fetch_pair_universe().to_pandas()

    liquidity_df = pd.read_parquet(liquidity_output_fname)
    price_df = pd.read_parquet(price_output_fname)

    # When reading from Parquet file, we need to deal with indexing by hand
    liquidity_df.index = pd.DatetimeIndex(liquidity_df.timestamp)
    price_df.index = pd.DatetimeIndex(price_df.timestamp)

    print(f"Prefilter data contains {len(liquidity_df):,} liquidity samples dn {len(price_df):,} OHLCV candles")

    # Crop price and liquidity data to our backtesting range
    price_df = price_df.loc[(price_df.timestamp >= start_at) & (price_df.timestamp <= end_at)]
    liquidity_df = liquidity_df.loc[(liquidity_df.timestamp >= start_at) & (liquidity_df.timestamp <= end_at)]

    # Prefilter for more liquidity conditions
    liquidity_per_pair = liquidity_df.groupby(liquidity_df.pair_id)
    print(f"Chain {chain_id.name} has liquidity data for {len(liquidity_per_pair.groups)}")

    passed_pair_ids = set()
    for pair_id, pair_df in liquidity_per_pair:
        if pair_df["high"].max() > Parameters.min_liquidity:
            passed_pair_ids.add(pair_id)

    pairs_df = pairs_df.loc[pairs_df.pair_id.isin(passed_pair_ids)]
    print(f"After liquidity filter we have {len(pairs_df)} trading pairs")

    price_per_pair = price_df.groupby(price_df.pair_id)
    passed_pair_ids = set()
    for pair_id, pair_df in price_per_pair:
        if pair_df["high"].max() < Parameters.max_price and pair_df["low"].min() > Parameters.min_price:
            passed_pair_ids.add(pair_id)

    pairs_df = pairs_df.loc[pairs_df.pair_id.isin(passed_pair_ids)]
    print(f"After broken price unit filter we have {len(pairs_df)} trading pairs")

    allowed_exchange_ids = set(exchange_universe.exchanges.keys())
    pairs_df = pairs_df.loc[pairs_df.exchange_id.isin(allowed_exchange_ids)]
    print(f"After DEX filter we have {len(pairs_df)} trading pairs")

    # Do cross-section of Polygon tokens from custom data 
    pairs_df = filter_for_base_tokens(pairs_df, custom_data_token_set)
    pairs_df = filter_for_stablecoins(pairs_df, StablecoinFilteringMode.only_volatile_pairs)
    print(f"After custom data ERC-20 token address cross section filter we have {len(pairs_df)} matching trading pairs")

    # Resample strategy decision candles to daily
    daily_candles = resample_multi_pair(price_df, Parameters.candle_time_bucket)
    daily_candles["timestamp"] = daily_candles.index

    print(f"After downsampling we have {len(daily_candles)} OHLCV candles and {len(liquidity_df)} liquidity samples")
    candle_universe = GroupedCandleUniverse(daily_candles, forward_fill=True)  # Forward will should make sure we can always calculate RSI, other indicators
    liquidity_universe = GroupedLiquidityUniverse(liquidity_df)

    # The final trading pair universe contains metadata only for pairs that passed
    # our filters
    pairs_universe = PandasPairUniverse(pairs_df, exchange_universe=exchange_universe)
    stop_loss_candle_universe = GroupedCandleUniverse(price_df)

    data_universe = Universe(
        time_bucket=Parameters.candle_time_bucket,
        liquidity_time_bucket=Parameters.candle_time_bucket,
        exchange_universe=exchange_universe,
        pairs=pairs_universe,
        candles=candle_universe,
        liquidity=liquidity_universe,
        chains={Parameters.chain_id}
    )

    reserve_asset = translate_token(pairs_universe.get_token(USDCE))

    _strategy_universe = TradingStrategyUniverse(
        data_universe=data_universe,
        backtest_stop_loss_time_bucket=Parameters.stop_loss_time_bucket,
        backtest_stop_loss_candles=stop_loss_candle_universe,
        reserve_assets={reserve_asset},

    )
    return _strategy_universe


strategy_universe = create_trading_universe(
    None,
    client,
    notebook_execution_context,
    UniverseOptions.from_strategy_parameters_class(Parameters, notebook_execution_context)
)

# Indicators

- We use `pandas_ta` Python package to calculate technical indicators
- These indicators are precalculated and cached on the disk

In [None]:
import pandas as pd
import pandas_ta

from tradeexecutor.analysis.regime import Regime
from tradeexecutor.strategy.execution_context import ExecutionContext
from tradeexecutor.strategy.pandas_trader.indicator import IndicatorSet, IndicatorSource
from tradeexecutor.strategy.parameters import StrategyParameters
from tradeexecutor.strategy.trading_strategy_universe import TradingStrategyUniverse
from tradingstrategy.utils.groupeduniverse import resample_candles


def rsi_safe(close_series: pd.Series, length: int):
    # Work around tokens that appear only for few hours*
    if len(close_series) > length:
        return pandas_ta.rsi(close_series, length)
    else:
        # The token did not trade long enough we could have ever calculated RSI
        return pd.Series(dtype="float64", index=pd.DatetimeIndex([]))


def create_indicators(
    timestamp: datetime.datetime | None,
    parameters: StrategyParameters,
    strategy_universe: TradingStrategyUniverse,
    execution_context: ExecutionContext
):
    indicators = IndicatorSet()

    indicators.add(
        "rsi",
        rsi_safe,
        {"length": parameters.rsi_length},
        IndicatorSource.close_price,
    )        
    return indicators


# Trading algorithm

- Describe out trading strategy as code

In [None]:
from tradeexecutor.state.visualisation import PlotKind
from tradeexecutor.state.trade import TradeExecution
from tradeexecutor.strategy.pandas_trader.strategy_input import StrategyInput
from tradeexecutor.strategy.weighting import weight_by_1_slash_n
from tradeexecutor.strategy.alpha_model import AlphaModel
from tradeexecutor.strategy.pandas_trader.strategy_input import IndicatorDataNotFoundWithinDataTolerance


def decide_trades(
    input: StrategyInput,
) -> list[TradeExecution]:

    # 
    # Decision cycle setup.
    # Read all variables we are going to use for the decisions.
    #
    parameters = input.parameters
    position_manager = input.get_position_manager()
    state = input.state
    timestamp = input.timestamp
    indicators = input.indicators
    strategy_universe = input.strategy_universe

    cash = position_manager.get_current_cash()

    # Another low cap problem checker.
    # Doing a bad non-liquidity trade may break the valuation calculations.
    total_equity = state.portfolio.get_total_equity()
    if total_equity > 1_000_000:
        position_valuations = "\n".join([f"{p} (token {p.pair.base.address}): {p.get_value()}" for p in state.portfolio.open_positions.values()])
        raise RuntimeError(f"Portfolio total equity exceeded 1,000,000 USD. Some broken math likely happened. Total equity is {total_equity} USD.\nOpen positions:\n{position_valuations}")
        
    #
    # Trading logic
    #
    # We do some extra checks here as we are trading low quality
    # low cap tokens which often have outright malicious data for trading.
    #

    alpha_model = AlphaModel(timestamp)

    trades = []
    for pair in strategy_universe.iterate_pairs():

        # Available trading liquidity,
        # read from the last day sample
        tvl = indicators.get_tvl(pair=pair)
        if tvl < parameters.min_liquidity:
            # Not yet enough liquidity to trade
            continue

        # Check last close price
        price = indicators.get_price(pair=pair)
        rsi = indicators.get_indicator_value("rsi", pair=pair)

        # Does the trading pair have good RSI in this point of history?
        if rsi is not None:  
            # Interpret RSI above +50 as a positive momentum        
            signal = rsi - 50
            if signal > 0:
                # Consider positive signals only, because we cannot short
                alpha_model.set_signal(pair, signal)

    # Use alpha model and construct a portfolio of four top assets
    alpha_model.select_top_signals(parameters.max_assets)
    alpha_model.assign_weights(weight_by_1_slash_n)
    alpha_model.normalise_weights()
    alpha_model.update_old_weights(state.portfolio)
    portfolio = position_manager.get_current_portfolio()
    portfolio_target_value = portfolio.get_total_equity() * parameters.allocation
    alpha_model.calculate_target_positions(position_manager, portfolio_target_value)
    trades = alpha_model.generate_rebalance_trades_and_triggers(
        position_manager,
        min_trade_threshold=parameters.minimum_rebalance_trade_percent * portfolio.get_total_equity(),
    )

    # Visualisations
    #
    if input.is_visualisation_enabled():
        visualisation = state.visualisation
        state.visualisation.add_calculations(timestamp, alpha_model.to_dict())  # Record alpha model thinking

    return trades  # Return the list of trades we made in this cycle

# Backtest

- Run the backtest

In [None]:
import logging
from tradeexecutor.backtest.backtest_runner import run_backtest_inline

result = run_backtest_inline(
    name=parameters.id,
    engine_version="0.5",
    decide_trades=decide_trades,
    create_indicators=create_indicators,
    client=client,
    universe=strategy_universe,
    parameters=parameters,
    strategy_logging=False,
    max_workers=1,
    # We need to set this really high value, because
    # some low cap tokens may only see 1-2 trades per year
    # and our backtesting framework aborts if it thinks
    # there is an issue with data quality
    data_delay_tolerance=pd.Timedelta(days=365),
    
    # Uncomment to enable verbose logging
    # log_level=logging.INFO,
)

state = result.state

trade_count = len(list(state.portfolio.get_all_trades()))
print(f"Backtesting completed, backtested strategy made {trade_count} trades")

# Equity curve

- Equity curve shows how your strategy accrues value over time
- A good equity curve has a stable ascending angle
- Benchmark against MATIC buy and hold

In [None]:
import pandas as pd
from tradeexecutor.analysis.multi_asset_benchmark import get_benchmark_data
from tradeexecutor.visual.benchmark import visualise_equity_curve_benchmark

# Pulls WMATIC/USDC as the benchmark
benchmark_indexes = get_benchmark_data(
    strategy_universe,
    cumulative_with_initial_cash=state.portfolio.get_initial_cash()
)

fig = visualise_equity_curve_benchmark(
    name=state.name,
    portfolio_statistics=state.stats.portfolio,
    all_cash=state.portfolio.get_initial_cash(),
    benchmark_indexes=benchmark_indexes,
    height=800,
    log_y=True,
)

fig.show()

# Performance metrics

- Display portfolio performance metrics
- Compare against buy and hold matic using the same initial capital

In [None]:
from tradeexecutor.analysis.multi_asset_benchmark import compare_strategy_backtest_to_multiple_assets

compare_strategy_backtest_to_multiple_assets(
    state,
    strategy_universe,
    display=True,
)

# Trading statistics

- Display summare about made trades

In [None]:
from tradeexecutor.analysis.trade_analyser import build_trade_analysis

analysis = build_trade_analysis(state.portfolio)
summary = analysis.calculate_summary_statistics()
display(summary.to_dataframe())