# Pancakeswap momentum trading

This is an example strategy backtesting how to momentum trading on PancakeSwap.

## Strategy and backtesting parameters

Here we define all parameters that affect the backtest outcome.

In [7]:
import pandas as pd
from tradingstrategy.timebucket import TimeBucket

# The starting date of the backtest
# Note: At the moment, due to QsTrader internal limitation,
# we define this as NYSE UTC trading hours
start = pd.Timestamp('2020-12-01')

# The ending date of the backtest
end = pd.Timestamp('2022-01-01')

# Start backtesting with $10k in hand
initial_cash = 10_000

# Prefiltering to limit the pair set to speed up computations
# How many USD all time buy volume the pair must have had
# to be included in the backtesting
prefilter_min_buy_volume = 5_000_000

# When this USD threshold of bonding curve liquidity provided is reached,
# we ape in to the token on a daily close.
min_liquidity = 250_000

# How many tokens we can hold in our portfolio
# If there are more new tokens coming to market per day,
# we just ignore those with less liquidity
max_assets_per_portfolio = 5

# How many % of all value we hold in cash all the time,
# so that we can sustain hits
cash_buffer = 0.33

# Use daily candles to run the algorithm
candle_time_frame = TimeBucket.d1

## Creating trading universe

First let's import libraries and initialise our dataset client.

In [8]:
try:
    import tradingstrategy
except ImportError:
    !pip install tradingstrategy
    import site
    site.main()

from tradingstrategy.client import Client

client = Client.create_jupyter_client()

Started Trading Strategy in Jupyter notebook environment, configuration is stored in /Users/moo/.tradingstrategy


Let's create a pair universe for PancakeSwap.
We will create a dataset of 2d candles that trade on PancakeSwap on Binance Smart Chain.

In [9]:
import pandas as pd
from tradingstrategy.chain import ChainId
from tradingstrategy.pair import PandasPairUniverse

columnar_pair_table = client.fetch_pair_universe()

exchange_universe = client.fetch_exchange_universe()

all_pairs_dataframe = columnar_pair_table.to_pandas()

our_exchange = exchange_universe.get_by_chain_and_slug(ChainId.bsc, "pancakeswap-v2")
our_pairs: pd.DataFrame = all_pairs_dataframe.loc[
    (all_pairs_dataframe['exchange_id'] == our_exchange.exchange_id) &  # Trades on Sushi
    (all_pairs_dataframe['buy_volume_all_time'] > 500_000)  # 500k min buys
]

# Create a Python set of pair ids
wanted_pair_ids = our_pairs["pair_id"]

# Make the trading pair data easily accessible
pair_universe = PandasPairUniverse(our_pairs)

print(f"Our trading universe has {len(pair_universe.get_all_pair_ids())} pairs that meet the prefiltering criteria")

Our trading universe has 5357 pairs that meet the prefiltering criteria


## Construct backtesting universe

Get daily candles and filter them against our wanted pair set.


We take all trading pairs registered on  (as the writing of this all Uniswap v2 compatible exchanges).
As the number of trading pairs is very high (50k+).
Most of these trading pairs are random noise and crap.
We reduce the number of trading pairs to speed up the backtest simulation, but this also introduce some
survivorship bias.

In [11]:
from tradingstrategy.frameworks.qstrader import prepare_candles_for_qstrader
from tradingstrategy.liquidity import GroupedLiquidityUniverse
from tradingstrategy.pair import PandasPairUniverse
from tradingstrategy.timebucket import TimeBucket
from tradingstrategy.candle import GroupedCandleUniverse
from tradingstrategy.exchange import ExchangeUniverse, Exchange


def prefilter_pairs(all_pairs_dataframe: pd.DataFrame, exchange: Exchange) -> pd.DataFrame:
    """Get rid of pairs that we definitely are not interested in.

    This will greatly speed up the later backtesting computations, as we do not need to
    calculate the opening volumes for thousands of pairs.

    Note that may induce survivorship bias - we use this mainly
    to ensure the backtetst completes in a reasonable time.
    """
    pairs: pd.DataFrame = all_pairs_dataframe.loc[
        (all_pairs_dataframe['exchange_id'] == exchange.exchange_id) &
        (all_pairs_dataframe['buy_volume_all_time'] > prefilter_min_buy_volume)  # 500k min buys
    ]
    return pairs

exchange_universe = client.fetch_exchange_universe()

# Do some test calculations for a single pair
# Note that PancakeSwap has two different deployments:
# you most likely want v2
our_exchange = exchange_universe.get_by_chain_and_slug(ChainId.bsc, "pancakeswap-v2")
assert our_exchange, "Could not find the DEX"

# Decompress the pair dataset to Python map
columnar_pair_table = client.fetch_pair_universe()

# Make our universe 40x smaller and faster to compute
filtered_pairs = prefilter_pairs(columnar_pair_table.to_pandas(), our_exchange)

# Make the trading pair data easily accessible
pair_universe = PandasPairUniverse(filtered_pairs)
wanted_pair_ids = pair_universe.get_all_pair_ids()

# Get daily candles as Pandas DataFrame
all_candles = client.fetch_all_candles(candle_time_frame).to_pandas()
filtered_candles = all_candles.loc[all_candles["pair_id"].isin(wanted_pair_ids)]
candle_universe = GroupedCandleUniverse(prepare_candles_for_qstrader(filtered_candles), timestamp_column="Date")

all_liquidity = client.fetch_all_liquidity_samples(TimeBucket.d1).to_pandas()
filtered_liquidity = all_liquidity.loc[all_liquidity["pair_id"].isin(wanted_pair_ids)]
filtered_liquidity = filtered_liquidity.set_index(filtered_liquidity["timestamp"])
liquidity_universe = GroupedLiquidityUniverse(filtered_liquidity)

print(f"""
Datafeeds set up.

Our trading universe for {candle_time_frame.value} candles is
- {len(wanted_pair_ids)} pairs
- {len(filtered_candles)} candles
- {len(filtered_liquidity)} liquidity samples

The source data for {candle_time_frame.value} has
- {len(columnar_pair_table)} pairs
- {len(all_candles)} candles
""")


Datafeeds set up.

Our trading universe for 1d candles is
- 1488 pairs
- 177673 candles
- 142086 liquidity samples

The source data for 1d has
- 77030 pairs
- 3759703 candles



## Creating the alpha model

In [None]:
from typing import Dict

from qstrader.alpha_model.alpha_model import AlphaModel


def update_pair_liquidity_threshold(
        now_: pd.Timestamp,
        threshold: float,
        reached_state: dict,
        pair_universe: PandasPairUniverse,
        liquidity_universe: GroupedLiquidityUniverse) -> dict:
    """Check which pairs reach the liquidity threshold on a given day.

    :param threshold: Available liquidity, in US dollar

    :return: Dict of pair ids who reached the liquidity threshold and how much liquidity they had
    """

    new_entries = {}

    # QSTrader carries hours in its timestamp like
    # Timestamp('2020-10-01 14:30:00+0000', tz='UTC')
    # as it follows NYSE market open and close timestamps.
    # Capitalgram candle timestamps are in days and mightnight, so we fix it here.
    ts = pd.Timestamp(now_.date())

    for pair_id in pair_universe.get_all_pair_ids():

        # Skip pairs we know reached liquidity threshold earlier
        if pair_id not in reached_state:
            # Get the todays liquidity
            liquidity_samples = liquidity_universe.get_samples_by_pair(pair_id)
            # We determine the available liquidity by the daily open
            try:
                liquidity_today = liquidity_samples["open"][ts]
            except KeyError:
                liquidity_today = 0

            if liquidity_today >= threshold:
                reached_state[pair_id] = now_
                new_entries[pair_id] = liquidity_today

    return new_entries


class MomentumAlphaModel(AlphaModel):
    """An alpha model that ranks pairs by the daily upwords momentum.

    A AlphaModel that provides a single scalar forecast
    value for each Asset in the Universe.

    Parameters
    ----------
    signal_weights : `dict{str: float}`
        The signal weights per asset symbol.
    universe : `Universe`, optional
        The Assets to make signal forecasts for.
    data_handler : `DataHandler`, optional
        An optional DataHandler used to preserve interface across AlphaModels.
    """

    def __init__(
            self,
            exchange_universe: ExchangeUniverse,
            pair_universe: PandasPairUniverse,
            candle_universe: GroupedCandleUniverse,
            liquidity_universe: GroupedLiquidityUniverse,
            min_liquidity,
            max_assets_per_portfolio,
            data_handler=None
    ):
        self.exchange_universe = exchange_universe
        self.pair_universe = pair_universe
        self.candle_universe = candle_universe
        self.liquidity_universe = liquidity_universe
        self.data_handler = data_handler
        self.min_liquidity = min_liquidity
        self.max_assets_per_portfolio = max_assets_per_portfolio
        self.liquidity_reached_state = {}

    def construct_shopping_basked(self, dt: pd.Timestamp, new_entries: dict) -> Dict[int, float]:
        """Construct a pair id """

        # Sort entire by volume
        sorted_by_volume = sorted(new_entries.items(), key=lambda x: x[1], reverse=True)

        # Weight all entries equally based on our maximum N entries size
        pick_count = min(len(sorted_by_volume), self.max_assets_per_portfolio)

        ts = pd.Timestamp(dt.date())

        if pick_count:
            weight = 1.0 / pick_count
            picked = {}
            for i in range(pick_count):
                pair_id, vol = sorted_by_volume[i]

                # An asset may have liquidity added, but not a single trade yet (EURS-USDC on 2020-10-1)
                # Ignore them, because we cannot backtest something with no OHLCV data
                candles = self.candle_universe.get_candles_by_pair(pair_id)

                # Note daily bars here, not open-close bars as internally used by QSTrader
                if ts not in candles["Close"]:
                    name = self.translate_pair(pair_id)
                    logger.warning("Tried to trade too early %s at %s", name, ts)
                    continue

                picked[pair_id] = weight

            return picked

        # No new feasible assets today
        return {}

    def translate_pair(self, pair_id: int) -> str:
        """Make pari ids human readable for logging."""
        pair_info = self.pair_universe.get_pair_by_id(pair_id)
        return pair_info.get_friendly_name(self.exchange_universe)

    def __call__(self, ts: pd.Timestamp) -> Dict[int, float]:
        """
        Produce the dictionary of scalar signals for
        each of the Asset instances within the Universe.

        :param ts: Candle timestamp iterator

        :return: Dict(pair_id, alpha signal)
        """

        # Refresh which cross the liquidity threshold today
        #new_entries = update_pair_liquidity_threshold(
        #    dt,
        #    self.min_liquidity,
        #    self.liquidity_reached_state,
        #    self.pair_universe,
        #    self.liquidity_universe
        #)
        #print("New entries coming to the market %zs %s", dt, new_entries)
        #picked = self.construct_shopping_basked(dt, new_entries)

        # For each pair, check the the diff between opening and closingn price

        samples = self.candle_universe.get_all_samples_by_timestamp(ts)
        return picked

print("Alpha model created")

## Setting up the strategy backtest

We have alpha model and trading universe set up, so next we will create a backtest simulation
where we feed all the data we set up for the backtest session.

In [None]:
from qstrader.asset.universe.static import StaticUniverse
from qstrader.data.backtest_data_handler import BacktestDataHandler
from qstrader.simulation.event import SimulationEvent
from qstrader.simulation.everyday import EverydaySimulationEngine
from qstrader.trading.backtest import BacktestTradingSession
from tradingstrategy.frameworks.qstrader import CapitalgramDataSource

data_source = CapitalgramDataSource(exchange_universe, pair_universe, candle_universe)

strategy_assets = list(data_source.asset_bar_frames.keys())
strategy_universe = StaticUniverse(strategy_assets)

data_handler = BacktestDataHandler(strategy_universe, data_sources=[data_source])

# Construct an Alpha Model that simply provides a fixed
# signal for the single GLD ETF at 100% allocation
# with a backtest that does not rebalance
strategy_alpha_model = MomentumAlphaModel(
    exchange_universe,
    pair_universe,
    candle_universe,
    liquidity_universe,
    min_liquidity,
    max_assets_per_portfolio)

strategy_backtest = BacktestTradingSession(
    start,
    end,
    strategy_universe,
    strategy_alpha_model,
    initial_cash=initial_cash,
    rebalance='daily',
    long_only=True,  # Spot markets do not support shorting
    cash_buffer_percentage=cash_buffer,
    data_handler=data_handler,
    simulation_engine=EverydaySimulationEngine(start, end)
)

print("Strategy set up complete")


## Running the strategy backtest

Next we run the strategy. This can take potentially many minutes, as it crunches through some data.

The notebook displays a HTML progress bar is displayed during the run, and the estimation when the simulation
is complete.

In [None]:
from tqdm.autonotebook import tqdm

max_events = len(strategy_backtest.prefetch_simulation_events())

# Run the test with a nice progress bar
with tqdm(total=max_events) as progress_bar:
    def progress_callback(idx: int, dt: pd.Timestamp, evt: SimulationEvent):
        progress_bar.set_description(f"Simulation at day {dt.date()}")
        progress_bar.update(1)

    strategy_backtest.run(progress_callback=progress_callback)

print("Backtest complete")