In [None]:
# python3 -m venv .venv
# source .venv/bin/activate
# pip install mellow_strategy_sdk

In [None]:
%load_ext autoreload
%autoreload 2

import pandas as pd
import numpy as np
import polars as pl
import math
from datetime import datetime

pd.set_option('display.max_colwidth', 70) # to fit hashes


from mellow_sdk.primitives import Pool, POOLS, MIN_TICK, MAX_TICK, Fee, Token
from mellow_sdk.data import RawDataUniV3
from mellow_sdk.strategies import AbstractStrategy, UniV3Passive
from mellow_sdk.backtest import Backtest
from mellow_sdk.positions import BiCurrencyPosition, UniV3Position
from mellow_sdk.viewers import PortfolioViewer, UniswapViewer, RebalanceViewer
from collections import deque

from IPython.display import Image

In [None]:
pool = Pool(
    tokenA=Token.USDC,
    tokenB=Token.WETH,
    fee=Fee.MIDDLE
)

In [None]:
# if there is no folder or files, create and download
data = RawDataUniV3(pool=pool, data_dir='data', reload_data=False).load_from_folder()

# Custom Strategy

In [None]:
class CustomStrategy(AbstractStrategy):
    """
    ``Custom Strategy`` is an active strategy with rebalances.
    """


    def __init__(
        self,
        lower_price: float,
        upper_price: float,
        pool: Pool,
        gas_cost: float,
        name: str = None,
    ):
        super().__init__(name)
        self.lower_price = lower_price
        self.upper_price = upper_price

        self.fee_percent = pool.fee.fraction
        self.gas_cost = gas_cost
        self.swap_fee = pool.fee.fraction

        self.last_mint_price = None
        self.last_timestamp_in_interval = None
        self.pos_num = None
        self.window_size = 3000
        self.sliding_window = deque([], maxlen=self.window_size)
        self.std_dev = 0
        self.last_price_timestamp = 0

    def rebalance(self, *args, **kwargs) -> str:
        record = kwargs["record"]
        timestamp = record["timestamp"]
        event = record['event']

        portfolio = kwargs["portfolio"]
        price_before, price = record["price_before"], record["price"]

        # process only swap events
        if event != 'swap':
            return None
        
        # push swap event - price into sliding window
        self.sliding_window.append(price)
        self.last_price_timestamp = timestamp
        self.std_dev = np.std(self.sliding_window)
        # print(self.std_dev)
        
        if len(portfolio.positions) == 0:
            # create biccurency positions for swap
            bi_cur = BiCurrencyPosition(
                name=f'main_vault',
                swap_fee=self.swap_fee,
                gas_cost=self.gas_cost,
                x=0,
                y=0,
                x_interest=None,
                y_interest=None
            )
            portfolio.append(bi_cur)
            
            # create first uni interval
            self.create_pos(x_in=1/price, y_in=1, price=price, timestamp=timestamp, portfolio=portfolio)
            return 'init'

        # if (timestamp - self.last_price_timestamp).total_seconds() > 60*60*24:
        #     self.sliding_window.append(price)
        #     self.last_price_timestamp = timestamp

        # # # if price outside interval for long create new uni position
        # if (timestamp - self.last_timestamp_in_interval).total_seconds() > 60 * 60 * 24 * 7:
        #     uni_pos = portfolio.get_position(f'UniV3_{self.pos_num}')
        #     x_out, y_out = uni_pos.withdraw(price)
        #     portfolio.remove(f'UniV3_{self.pos_num}')
        #     self.create_pos(x_in=x_out, y_in=y_out, price=price, timestamp=timestamp, portfolio=portfolio)
        #     return 'rebalance'

        # collect fees from uni
        uni_pos = portfolio.get_position(f'UniV3_{self.pos_num}')
        uni_pos.charge_fees(price_0=price_before, price_1=price)
        
        # if abs(price > ( price + self.std_dev * self.window_size )) or bs(price < ( price - self.std_dev * self.window_size )):
        should_rebalance = 0
        if price > uni_pos.upper_price or price < uni_pos.lower_price:
            should_rebalance = 1
        if (timestamp - self.last_timestamp_in_interval).total_seconds() > 60*60*24*7:
            should_rebalance = 1

        if should_rebalance:
            uni_pos = portfolio.get_position(f'UniV3_{self.pos_num}')
            x_out, y_out = uni_pos.withdraw(price)
            portfolio.remove(f'UniV3_{self.pos_num}')
            self.create_pos(x_in=x_out, y_in=y_out, price=price, timestamp=timestamp, portfolio=portfolio)
            return 'rebalance'
        


        return None
    

    def create_pos(self, x_in, y_in, price, timestamp, portfolio):
        """
            Swaps x_in, y_in in right proportion and mint to new interval
        """
        def pricetickcalc(input_data):
            return np.log(math.sqrt(input_data))/np.log(math.sqrt(1.0001))

        if self.pos_num is None:
            self.pos_num = 1
        else:
            self.pos_num += 1
            

        # bicurrency position that can swap tokens
        bi_cur = portfolio.get_position('main_vault')
        

        # Low-risk strategy
        risk_profile = 4
        llow = price - self.std_dev * risk_profile
        lhigh = price + self.std_dev * risk_profile
        
        # Negative price handling, just manually set to 2 width, can be modified according to need, no particular reason for it
        llow = price - (self.std_dev * risk_profile) * 2 if llow <= 0 else llow

        # # Calculating the tick values for lower and upper range to check if they are equal, 
        # # especially for USDC/USDT type pools
        # l_lowerTick = round(pricetickcalc(llow)/Fee.MIDDLE)*Fee.MIDDLE
        # l_higherTick = round(pricetickcalc(lhigh)/Fee.MIDDLE)*Fee.MIDDLE

        # # Edge case handling for same ticks
        # if (l_lowerTick == l_higherTick):
        #     llow = priceFromTick(math.floor(currentTick/Fee.MIDDLE) * Fee.MIDDLE)
        #     lhigh = priceFromTick(math.ceil(currentTick/Fee.MIDDLE) * Fee.MIDDLE)

        # add tokens to bicurrency position
        bi_cur.deposit(x_in, y_in)
        
        if self.pos_num == 1:
            lower_price = max(1.0001 ** MIN_TICK, price * 0.9)
            upper_price = min(1.0001 ** MAX_TICK, price * 1.1)
        else:
            lower_price = llow
            upper_price = lhigh

        
        # new uni position
        uni_pos = UniV3Position(
            name=f'UniV3_{self.pos_num}', 
            lower_price=lower_price,
            upper_price=upper_price,
            fee_percent=self.fee_percent, 
            gas_cost=self.gas_cost
        )
        
        # add new position to portfolio
        portfolio.append(uni_pos)
        
        # uni_pos.aligner is UniswapLiquidityAligner, good class for working with liquidity operations
        dx, dy = uni_pos.aligner.get_amounts_for_swap_to_optimal(
            x_in, y_in, swap_fee=bi_cur.swap_fee, price=price
        )
        
        # swap tokens to right proportion (if price in interval swaps to equal liquidity in each token)
        if dx > 0:
            bi_cur.swap_x_to_y(dx, price=price)
        if dy > 0:
            bi_cur.swap_y_to_x(dy, price=price)

        x_uni, y_uni = uni_pos.aligner.get_amounts_after_optimal_swap(
            x_in, y_in, swap_fee=bi_cur.swap_fee, price=price
        )
        
        # withdraw tokens from bicurrency
        # because of float numbers precision subtract 1e-9
        bi_cur.withdraw(x_uni - 1e-9, y_uni - 1e-9)
        
        # deposit tokens to uni
        uni_pos.deposit(x_uni, y_uni, price=price)
        
        # remember last mint price to track price in interval
        self.last_mint_price = price
        
        # remember timestamp price was in interval
        self.last_timestamp_in_interval = timestamp



In [None]:
strat = CustomStrategy(
    lower_price=data.swaps['price'].min(),
    upper_price=data.swaps['price'].max(),
    pool=pool,
    gas_cost=0,
    name='custom'
)

bt = Backtest(strategy=strat)
portfolio_history, rebalance_history, uni_history = bt.backtest(df=data.swaps)

In [None]:
rv = RebalanceViewer(rebalance_history)
uv = UniswapViewer(uni_history)
pv = PortfolioViewer(portfolio_history, pool)

# Draw portfolio stats, like value, fees earned, apy
fig1, fig2, fig3, fig4, fig5, fig6 = pv.draw_portfolio()

# Draw Uniswap intervals
intervals_plot = uv.draw_intervals(data.swaps)

# Draw rebalances
rebalances_plot = rv.draw_rebalances(data.swaps)

# Calculate df with portfolio stats
stats = portfolio_history.calculate_stats()

In [None]:
# number of rebalances
rv.rebalance_history.to_df().shape[0] 

In [None]:
rebalances_plot.show()
rebalances_plot.write_image('catch_rebalances.png')

In [None]:
intervals_plot.show()
intervals_plot.write_image('catch_intervals.png')

In [None]:
fig2.show()
fig2.write_image('catch_fig2.png')

In [None]:
fig6.show()
fig6.write_image('catch_fig6.png')

In [None]:
stats.tail(2)

In [None]:
# fast draw all in one cell
display(
    *[
        Image(i.update_layout(height=300,width=700).to_image(format='png')) 
        for i in [intervals_plot, rebalances_plot, fig2, fig4, fig6]
    ]
)