# Backtester Functions

In [1]:
import pandas as pd
import pandas_gbq
import pydata_google_auth
import numpy as np
import matplotlib.pyplot as plt
import math
import calendar

from tqdm import tqdm
from datetime import timedelta
from functools import reduce
pd.options.mode.chained_assignment = None

## Class 

In [2]:
class LP_Rebalance:
    def __init__(
        self,
        token_0: str,
        token_1: str,
        fee: int, # in bps
        local_dir: str,
        start_t: str,
        end_t: str,
        range_perc: float, # i.e. +/- 30%
        col_ratio: float=100, # i.e. 100%
        com_ratio: float=0, # i.e. 0bps
        PLP_annual_yield: float=0, # i.e. 0%
        spread_mult: float=1,
        pool_data: pd.DataFrame=None,
        inverse_price: bool=True,
        legs: dict={'short_put': 1},
        delta: dict={'short_put': 0.5},
    ) -> None:
        """
        Initialize LP Rebalancing class
        
        :token_0 Token 0
        :token_1 Token 1
        :fee fee rate in bps
        :local_dir local directory to save plots
        :start_t start time (inclusive) (i.e. '2021-05-06 00:06:12' or '2021-05-06')
        :end_t end time (exclusive)
        :range_perc range percent (for +/- 30%, input 30)
        :col_ratio collateral ratio (%) - assumes position is never underwater or liquidated
        :com_ratio commission ratio (bps)
        :PLP_annual_yield annual yield from providing Panoptic liquidity (%)
        :spread_mult spread multiplier for buying popular options
        :pool_data enter dataframe of pool data from another strategy to save load time. Default=None
        :inverse_price invert the price (if true: gets price of token_1 in terms of token_0)
        :legs type of option ['SHORT_PUT', 'LONG_PUT', 'SHORT_CALL', 'LONG_CALL'] and quantity
        """
        self.token_0 = token_0
        self.token_1 = token_1
        self.fee_rate = fee / 10_000
        self.pool_address = LP_Rebalance.get_pool_address(self.token_0, self.token_1, fee)
        self.dec_0 = LP_Rebalance.get_token_dec(self.token_0)
        self.dec_1 = LP_Rebalance.get_token_dec(self.token_1)
        self.tick_spacing = LP_Rebalance.get_tick_spacing(fee)
        ''' Our_capital is 1 b/c it allows results to be interpreted as percentages
        Strategy does accounting in numeraire token
        I.e. start with 1 token of numeraire, split 50/50 to LP.
        At end of rebalancing period (e.g. 1 day), we sell everything back into the numeraire token
        And calculate the % change (return) on the numeraire token '''
        self.our_capital = 1 # in token y
        self.raw_dir = local_dir
        self.start_t = start_t
        self.end_t = end_t
        self.range_perc = (range_perc / 100) + 1
        self.tick_width = LP_Rebalance.convert_r_to_tick_width(self.range_perc)
        self.col_ratio = col_ratio / 100
        self.com_ratio = com_ratio / 10_000
        self.collateral = self.our_capital * self.col_ratio
        self.PLP_annual_yield = PLP_annual_yield / 100
        self.spread_mult = spread_mult
        ''' If inverse_price, our numeraire is token_0
        Else, our numeraire is token_1
        Ex: For USDC/ETH pool:
        token 0 = USDC
        token 1 = ETH
        Inverse_price = True -> we calculate returns in terms of USDC
        Inverse_price = False -> we calculate returns in terms of ETH
        This will affect results! '''
        self.inverse_price = inverse_price

        valid_options = ['SHORT_PUT', 'LONG_PUT', 'SHORT_CALL', 'LONG_CALL']
        for leg in legs:
            if leg not in valid_options:
                raise BaseException(f"Invalid option type. Valid types: {valid_options}")
        self.legs = legs
        self.delta = delta

        self.BASE = 1.0001

        if pool_data is None:
            self.load_pool_data()
        else:
            self.data = pool_data
        

    def load_pool_data(self):
        """Loads Univ3 pool swap data"""
        # Replace this function with your own GBQ data fetcher
        # See: https://github.com/panoptic-labs/research/blob/JP/_research-bites/DataTutorial/tutorial.ipynb
        print("Loading Data...")

        SCOPES = [
            'https://www.googleapis.com/auth/cloud-platform',
            'https://www.googleapis.com/auth/drive',
        ]

        credentials = pydata_google_auth.get_user_credentials(
            SCOPES,
            auth_local_webserver=True,
        )

        query = f"""
        SELECT DISTINCT *
        FROM `arcane-world-371019.First_sync.1`
        WHERE address = '{self.pool_address}'
            AND block_timestamp >= '{self.start_t}'
            AND block_timestamp < '{self.end_t}'
        ORDER BY block_number, transaction_index
        """
        self.data = pandas_gbq.read_gbq(query, project_id = "arcane-world-371019", credentials=credentials)
        self.transform_pool_data()

    def transform_pool_data(self):
        """Transform amounts to human-readable format"""
        self.data['amount0'] = self.data['amount0'].apply(LP_Rebalance.get_twos_comp)
        self.data['amount1'] = self.data['amount1'].apply(LP_Rebalance.get_twos_comp)

        self.data['amount0'] = self.data['amount0'] / (10 ** self.dec_0)
        self.data['amount1'] = self.data['amount1'] / (10 ** self.dec_1)

        self.data['price'] = self.data['tick'].apply(self.convert_tick)

        # Note our GBQ query ordered by blocknumber, transaciton_index
        self.data.set_index('block_timestamp', inplace=True) # Assume block timestamps are accurate "enough" for our use-case
        self.data.index = pd.to_datetime(self.data.index)

        # Unpack sqrt price
        self.data['sqrtPrice'] = self.data['sqrtPrice'].apply(LP_Rebalance.unpack_sqrtprice)

        # Calculate sqrt price change
        self.data = LP_Rebalance.calc_sqrtprice_change(self.data)

        self.data['date'] = self.data.index.date
        # Lag prices (Have to do for illiquid pools w/few txs per day)
        self.data['price_lag'] = self.data['price'].shift()
        self.data['tick_lag'] = self.data['tick'].shift()
        self.data['sqrtPrice_lag'] = self.data['sqrtPrice'].shift()
        self.data = self.data.iloc[1:]

    def run_strat(self):
        print("Running Strategy")
        self.create_strat()

        # Calculate liquidity per tick
        tqdm().pandas(desc='Step 1')
        self.daily['liq_per_tick'] = self.daily.progress_apply(self.calc_liq_per_tick, axis=1)
        self.weekly['liq_per_tick'] = self.weekly.progress_apply(self.calc_liq_per_tick, axis=1)
        self.monthly['liq_per_tick'] = self.monthly.progress_apply(self.calc_liq_per_tick, axis=1)

        # Calculate fees
        tqdm().pandas(desc='Step 2')
        self.daily[['our_fee', 'fee_asset']] = self.daily.progress_apply(self.calc_fees, axis=1)
        self.weekly[['our_fee', 'fee_asset']] = self.weekly.progress_apply(self.calc_fees, axis=1)
        self.monthly[['our_fee', 'fee_asset']] = self.monthly.progress_apply(self.calc_fees, axis=1)

        quantity_total = sum(self.legs.values())
        
        self.daily_fees_legs = {}
        for leg, quantity in self.legs.items():
            self.daily_fees_legs[leg] = self.daily.groupby('date').apply(self.get_fee_summary, leg) * quantity
        self.daily_fees = reduce(lambda x, y: x.add(y, fill_value=0), self.daily_fees_legs.values())
        self.daily_fees.rename(columns={0: 'fees_total'}, inplace=True)
        self.daily_fees['fees_perc'] = self.daily_fees['fees_total'] / (self.collateral * quantity_total)

        self.weekly_fees_legs = {}
        for leg, quantity in self.legs.items():
            self.weekly_fees_legs[leg] = self.weekly.groupby([pd.Grouper(level = 'block_timestamp', freq = 'W-MON', closed='left')]).apply(self.get_fee_summary, leg) * quantity
        self.weekly_fees = reduce(lambda x, y: x.add(y, fill_value=0), self.weekly_fees_legs.values())
        self.weekly_fees.rename(columns={0: 'fees_total'}, inplace=True)
        self.weekly_fees['fees_perc'] = self.weekly_fees['fees_total'] / (self.collateral * quantity_total)
        self.weekly_fees.index = self.weekly_fees.index.date
        self.weekly_fees.index.name = 'date'

        self.monthly_fees_legs = {}
        for leg, quantity in self.legs.items():
            self.monthly_fees_legs[leg] = self.monthly.groupby(['year', 'month']).apply(self.get_fee_summary, leg) * quantity
        self.monthly_fees = reduce(lambda x, y: x.add(y, fill_value=0), self.monthly_fees_legs.values())
        self.monthly_fees.rename(columns={0: 'fees_total'}, inplace=True)
        self.monthly_fees['fees_perc'] = self.monthly_fees['fees_total'] / (self.collateral * quantity_total)
        y = self.monthly_fees.index.get_level_values('year')
        m = self.monthly_fees.index.get_level_values('month')
        self.monthly_fees.index = pd.to_datetime(y * 10000 + m * 100 + 1, format="%Y%m%d")
        self.monthly_fees.index = [dti.replace(day = calendar.monthrange(dti.year, dti.month)[1]).date() for dti in self.monthly_fees.index]
        self.monthly_fees.index.name = 'date'
        self.monthly_fees.loc[self.daily_fees.index[0]] = [None, 0]
        self.monthly_fees.sort_index(inplace=True)

        # Calculate PnL
        tqdm().pandas(desc='Step 3')
        self.daily_pos_legs = {}
        for i, (leg, quantity) in enumerate(self.legs.items()):
            self.daily_pos_legs[leg] = self.daily.groupby('date').progress_apply(self.calc_position, option_type=leg, leg_number=i) * quantity
        self.daily_pos = reduce(lambda x, y: x.add(y, fill_value=0), self.daily_pos_legs.values())
        # Assumes same collateralization for each leg (TODO: allow customizable collateral)
        self.daily_pos['pnl_perc'] = self.daily_pos['pnl'] / (self.collateral * quantity_total)

        self.weekly_pos_legs = {}
        for i, (leg, quantity) in enumerate(self.legs.items()):
            self.weekly_pos_legs[leg] = self.weekly.groupby([pd.Grouper(level = 'block_timestamp', freq = 'W-MON', closed='left')]).progress_apply(self.calc_position, option_type=leg, leg_number=i) * quantity
        self.weekly_pos = reduce(lambda x, y: x.add(y, fill_value=0), self.weekly_pos_legs.values())
        self.weekly_pos['pnl_perc'] = self.weekly_pos['pnl'] / (self.collateral * quantity_total)
        self.weekly_pos.index = self.weekly_pos.index.date
        self.weekly_pos.index.name = 'date'

        self.monthly_pos_legs = {}
        for i, (leg, quantity) in enumerate(self.legs.items()):
            self.monthly_pos_legs[leg] = self.monthly.groupby(['year', 'month']).progress_apply(self.calc_position, option_type=leg, leg_number=i) * quantity
        self.monthly_pos = reduce(lambda x, y: x.add(y, fill_value=0), self.monthly_pos_legs.values())
        self.monthly_pos['pnl_perc'] = self.monthly_pos['pnl'] / (self.collateral * quantity_total)

        y = self.monthly_pos.index.get_level_values('year')
        m = self.monthly_pos.index.get_level_values('month')
        self.monthly_pos.index = pd.to_datetime(y * 10000 + m * 100 + 1, format="%Y%m%d")
        self.monthly_pos.index = [dti.replace(day = calendar.monthrange(dti.year, dti.month)[1]).date() for dti in self.monthly_pos.index]
        self.monthly_pos.index.name = 'date'
        self.monthly_pos.loc[self.daily_pos.index[0]] = [None, None, None, None, None, None, 0]
        self.monthly_pos.sort_index(inplace=True)

        # Calculate Commissions paid
        self.update_commissions(self.com_ratio * 10_000)

        # Calculate PLP yield earned
        self.update_PLP_yield(self.PLP_annual_yield * 100)

    def update_commissions(self, com_ratio: float) -> None:
        """
        Update commission ratio

        :com_ratio commission ratio (bps)
        """
        self.com_ratio = com_ratio / 10_000
        coms_total = -1 * self.com_ratio * self.our_capital
        coms_perc = coms_total / self.collateral
        self.daily_com = pd.DataFrame({'coms_total': coms_total, 'coms_perc': coms_perc}, index=self.daily_pos.index)
        self.weekly_com = pd.DataFrame({'coms_total': coms_total, 'coms_perc': coms_perc}, index=self.weekly_pos.index)
        self.monthly_com = pd.DataFrame({'coms_total': coms_total, 'coms_perc': coms_perc}, index=self.monthly_pos.index)

    def update_PLP_yield(self, PLP_annual_yield: float) -> None:
        """
        Update Panoptic LP annual yield

        :PLP_annual_yield PLP annual yield (%)
        """
        self.PLP_annual_yield = PLP_annual_yield / 100
        daily_yield = self.PLP_annual_yield / 365
        weekly_yield = self.PLP_annual_yield / 52
        monthly_yield = self.PLP_annual_yield / 12
        self.daily_PLP_yield = pd.DataFrame({'PLP_yield_total': daily_yield * self.collateral, 'PLP_yield_perc': daily_yield}, index=self.daily_pos.index)
        self.weekly_PLP_yield = pd.DataFrame({'PLP_yield_total': weekly_yield * self.collateral, 'PLP_yield_perc': weekly_yield}, index=self.weekly_pos.index)
        self.monthly_PLP_yield = pd.DataFrame({'PLP_yield_total': monthly_yield * self.collateral, 'PLP_yield_perc': monthly_yield}, index=self.monthly_pos.index)

    def update_spread_mult(self, spread_mult: float) -> None:
        """
        Update spread multiplier

        :spread_mult spread multiplier (e.g. 1.25X)
        """ 
        for fees in [self.daily_fees, self.weekly_fees, self.monthly_fees]:
            fees['fees_total'] *= spread_mult / self.spread_mult
            fees['fees_perc'] = fees['fees_total'] / self.collateral
        self.spread_mult = spread_mult

    def create_strat(self):
        """
        Define Rebalancing Strategies
        Define 3 strategies: LP +/- X% of start price and rebalance (1) daily, (2) weekly, and (3) monthly
        -Assumes no rebalancing costs in terms of gas fees or slippage/swap fees
        -Assumes no reinvestment/LPing of collected fees
        """
        daily = self.data.copy()
        weekly = self.data.copy()
        monthly = self.data.copy()

        #TODO: refactor so backtester can choose different deltas for different legs
        long_options = ['LONG_PUT', 'LONG_CALL']
        put_options = ['SHORT_PUT', 'LONG_PUT']
        daily['price_start'] = daily.groupby('date')['price_lag'].transform(lambda x: x[0])
        daily['tick_start'] = daily.groupby('date')['tick_lag'].transform(lambda x: x[0])
        daily['sqrtPrice_start'] = daily.groupby('date')['sqrtPrice_lag'].transform(lambda x: x[0])

        for i, leg in enumerate(self.delta):
            is_long = True if leg in long_options else False
            is_put = True if leg in put_options else False
            daily[f'strike_start_{i}'] = (
                daily
                .groupby('date')['price_start']
                .transform(
                    lambda group_prices: self.strike_from_delta(
                        spot_price   = group_prices.iloc[0],
                        delta        = self.delta[leg],
                        range_factor = self.range_perc,
                        is_long      = is_long,
                        is_put       = is_put
                    )
                )
            )
            daily[f'strike_tick_start_{i}'] = (
                daily
                .groupby('date')[f'strike_start_{i}']
                .transform(
                    lambda group_strikes: self.convert_price_to_tick(
                        p=group_strikes.iloc[0]
                    )
                )
            )

            daily[f'price_a_{i}'] = daily[f'strike_start_{i}'] / self.range_perc
            daily[f'price_b_{i}'] = daily[f'strike_start_{i}'] * self.range_perc
        self.daily = daily

        weekly['price_start'] = weekly.groupby([pd.Grouper(level = 'block_timestamp', freq = 'W-MON', closed='left')])['price_lag'].transform(lambda x: x[0])
        weekly['tick_start'] = weekly.groupby([pd.Grouper(level = 'block_timestamp', freq = 'W-MON', closed='left')])['tick_lag'].transform(lambda x: x[0])
        weekly['sqrtPrice_start'] = weekly.groupby([pd.Grouper(level = 'block_timestamp', freq = 'W-MON', closed='left')])['sqrtPrice_lag'].transform(lambda x: x[0])
        for i, leg in enumerate(self.delta):
            # Rebalance every Monday (UTC)
            weekly[f'strike_start_{i}'] = (
                weekly
                .groupby([pd.Grouper(level='block_timestamp', freq='W-MON', closed='left')])['price_start']
                .transform(
                    lambda group_prices: self.strike_from_delta(
                        spot_price   = group_prices.iloc[0],
                        delta        = self.delta[leg],
                        range_factor = self.range_perc,
                        is_long      = is_long,
                        is_put       = is_put
                    )
                )
            )
            weekly[f'strike_tick_start_{i}'] = (
                weekly
                .groupby([pd.Grouper(level='block_timestamp', freq='W-MON', closed='left')])[f'strike_start_{i}']
                .transform(
                    lambda group_strikes: self.convert_price_to_tick(
                        p=group_strikes.iloc[0]
                    )
                )
            )
            weekly[f'price_a_{i}'] = weekly[f'strike_start_{i}'] / self.range_perc
            weekly[f'price_b_{i}'] = weekly[f'strike_start_{i}'] * self.range_perc
        self.weekly = weekly

        monthly['month'] = monthly.index.month
        monthly['year'] = monthly.index.year
        monthly['price_start'] = monthly.groupby(['year', 'month'])['price_lag'].transform(lambda x: x[0])
        monthly['tick_start'] = monthly.groupby(['year', 'month'])['tick_lag'].transform(lambda x: x[0])
        monthly['sqrtPrice_start'] = monthly.groupby(['year', 'month'])['sqrtPrice_lag'].transform(lambda x: x[0])
        for i, leg in enumerate(self.delta):
            # Rebalance on the 1st of the month
            monthly[f'strike_start_{i}'] = (
                monthly
                .groupby(['year', 'month'])['price_start']
                .transform(
                    lambda group_prices: self.strike_from_delta(
                        spot_price   = group_prices.iloc[0],  # first spot in this group
                        delta        = self.delta[leg],
                        range_factor = self.range_perc,
                        is_long      = is_long,
                        is_put       = is_put
                    )
                )
            )
            monthly[f'strike_tick_start_{i}'] = (
                monthly
                .groupby(['year', 'month'])[f'strike_start_{i}']
                .transform(
                    lambda group_strikes: self.convert_price_to_tick(
                        p = group_strikes.iloc[0]
                    )
                )
            )

            monthly[f'price_a_{i}'] = monthly[f'strike_start_{i}'] / self.range_perc
            monthly[f'price_b_{i}'] = monthly[f'strike_start_{i}'] * self.range_perc
        self.monthly = monthly

    def calc_fees(self, s: pd.Series) -> pd.Series:
        """Calculate fee collected by us per swap
        (Uses sqrtPrice and liq_per_tick)"""
        # We ignore decimals for sqrtPrice - they end up cancelling out in calc_liq_per_tick(), calc_fees(), and get_fee_summary()
        # Our fees
        # TODO: current code assumes all legs are the same width, so fee per liquidity is the same
        # TODO: can update code to calculate fees per liquidity for differing leg widths
        low_bound = np.sqrt(self.BASE ** (s['strike_tick_start_0'] - self.tick_width))
        up_bound = np.sqrt(self.BASE ** (s['strike_tick_start_0'] + self.tick_width))
        prev_adj = np.clip(s['prev_sqrtPrice'], low_bound, up_bound)
        curr_adj = np.clip(s['sqrtPrice'], low_bound, up_bound)

        if s['change_sqrtPrice'] >= 0: # price of token 0 in token 1 went up -> fee collected in token 1
            fee_asset = self.token_1
            our_fee = s['liq_per_tick'] * (curr_adj - prev_adj) * self.fee_rate
        
        else: # price of token 0 in token 1 went down -> fee collected in token 0
            fee_asset = self.token_0
            our_fee = s['liq_per_tick'] * ((1 / curr_adj) - (1 / prev_adj)) * self.fee_rate

        return pd.Series([our_fee, fee_asset])

    def get_fee_summary(self, df: pd.DataFrame, option_type: str) -> pd.Series:
        """Get daily summary statistics of our collected fees"""
        # We ignore decimals for sqrtPrice - they end up cancelling out in calc_liq_per_tick(), calc_fees(), and get_fee_summary()
        price_close = df['price'][-1]
        sqrtPrice_close = df['sqrtPrice'][-1]

        # Our performance
        fees_0 = df.loc[df['fee_asset'] == self.token_0, 'our_fee'].sum()
        fees_1 = df.loc[df['fee_asset'] == self.token_1, 'our_fee'].sum()

        fees_total = fees_1 + fees_0 * (sqrtPrice_close ** 2) # in terms of token 1 (with dec_1)

        if self.inverse_price:
            fees_total *= price_close # in terms of token 0

        fees_total *= self.spread_mult # Fees owed to seller are multiplied by "spread" for popular strike prices
        
        short_options = ['SHORT_PUT', 'SHORT_CALL']
        long_options = ['LONG_PUT', 'LONG_CALL']
        if option_type in long_options:
            fees_total *= -1 # Fees are owed

        # fees_perc = fees_total / self.collateral

        return pd.Series([
                        fees_total,
                        ])

    def calc_position(self, df: pd.DataFrame, option_type: str, leg_number: int) -> pd.Series:
        """
        Calculates strategy position in token y, the numeraire
        (if inverse_price then this is token_0, else token_1)
        and daily returns (excluding fees).
        Expects df to be LP data of swaps.

        p_a: lower price range
        p_b: upper price range
        p_0: mid-price (initial price)
        x_0: initial amount of token x
        y_0: initial amount of token y

        p_1: price at beginning of day (same as p_0
            for daily rebalancing but different for
            weekly and monthly rebalancing)
        x_1: amount of token x at BOD
        y_1: amount of token y at BOD

        p: current (close) price at EOD
        x: current amount of token x
        y: current amount of token y
        IL: impermanent loss (loss compared to HODLing)

        """
        # Initial LP price range and positions
        p_a = df[f'price_a_{leg_number}'][-1]
        p_b = df[f'price_b_{leg_number}'][-1]
        p_0 = df['price_start'][-1]

        # Calculate liquidity factor L
        x_c = self.our_capital / df[f'strike_start_{leg_number}'][-1] # If price is below LP position, then LP position is entirely composed of (1 / strike) ETH
        y_c = self.our_capital # If price is above LP position, then LP position is entirely composed of 1 USDC
        # https://atiselsts.github.io/pdfs/uniswap-v3-liquidity-math.pdf
        L_x = x_c * ((np.sqrt(p_a) * np.sqrt(p_b)) / (np.sqrt(p_b) - np.sqrt(p_a)))
        L_y = y_c / (np.sqrt(p_b) - np.sqrt(p_a))
        L = min(L_x, L_y)
        
        # Calculate starting token X and Y amounts needed to deposit into the actual LP position based on starting price and price range
        x_0 = L * (np.sqrt(p_b) - np.sqrt(p_0)) / (np.sqrt(p_0) * np.sqrt(p_b))
        y_0 = L * (np.sqrt(p_0) - np.sqrt(p_a))
        pos_0_y = (x_0 * p_0) + y_0
        pos_0_x = (y_0 / p_0) + x_0

        # End of time period position value
        p = df['price'][-1] # close (current) price
        x = L * (np.sqrt(p_b) - np.sqrt(p)) / (np.sqrt(p) * np.sqrt(p_b))
        y = L * (np.sqrt(p) - np.sqrt(p_a))
        pos_y = x * p + y # in token y
        pos_x = pos_y / p

        if option_type == 'SHORT_PUT':
            # Same as LPing
            pnl_y = pos_y - pos_0_y # in token y
        elif option_type == 'LONG_PUT':
            # Option buyer owns pos_0_y (our_capital) since LP position immediately swapped to token y when borrowed
            # Option buyer owes pos_y to option seller
            pnl_y = pos_0_y - pos_y # in token y
            
        elif option_type == 'SHORT_CALL': #TODO: use this instead: # (self.our_capital / strike) * (p_0 - p)
            # LPing but with borrowed token x
            # Short call = short put + short token x
            pnl_short_put = pos_y - pos_0_y # in token y
            pnl_short_risky_asset_y = (self.our_capital / df[f'strike_start_{leg_number}'][-1]) * (p_0 - p)
            pnl_y = pnl_short_put + pnl_short_risky_asset_y
        elif option_type == 'LONG_CALL':
            # Opposite of Short Call
            pnl_short_put = pos_y - pos_0_y # in token y
            pnl_short_risky_asset_y = (self.our_capital / df[f'strike_start_{leg_number}'][-1]) * (p_0 - p)
            pnl_y = -1 * (pnl_short_put + pnl_short_risky_asset_y)

        return pd.Series({
            'x_0': x_0,
            'y_0': y_0,
            'x': x,
            'y': y,
            'pos': pos_y,
            'pnl': pnl_y,
        })

    def convert_price(self, p: float) -> float:
        """Gets (inverse) price (convenient for pairs like USDC/ETH)
        Inverse price: price of token1 in terms of token0
        Regular price: price of token0 in terms of token1"""
        if self.inverse_price:
            return 10 ** (self.dec_1 - self.dec_0) / p
        else:
            return p / (10 ** (self.dec_1 - self.dec_0))

    def convert_tick(self, tick: int) -> float:
        """Converts tick to price"""
        return self.convert_price(self.BASE ** tick)

    def convert_price_to_tick(self, p: float) -> float:
        """Converts price to tick"""    
        return math.log(self.convert_price(p), self.BASE)

    def calc_liq_per_tick(self, s: pd.Series) -> float:
        """
        Calculates our liquidity per tick
        (see https://atiselsts.github.io/pdfs/uniswap-v3-liquidity-math.pdf)
        """
        ''' Do not invert prices for calculating liquidity per tick
        (This is an input to self.calc_fees() which uses sqrtPrice and assumes no inverted price)
        Note that sqrtPrice should be multiplied by 10 ^ [(dec0 - dec1) / 2]
        (or alternative multiply sqrtPrice ^2 by 10 ^ (dec0 - dec1))
        to achieve correct decimal places
        But we ignore decimals here, as they end up cancelling out in anyway in calc_liq_per_tick(), calc_fees(), and get_fee_summary() '''
        price = s['sqrtPrice_start'] ** 2
        p_a = price / self.range_perc

        p_0 = s['price_start']
        if self.inverse_price:
            y_0 = (self.our_capital / p_0) / 2
        else:
            y_0 = self.our_capital / 2 # In the case of USDC this is just $0.50

        # # Initial liquidity per tick
        L = y_0 / (np.sqrt(price) - np.sqrt(p_a))
        return L

    @staticmethod
    def get_delta(spot_price, strike, range_factor, is_long, is_put):  
        return_multiplier = -1 if is_long else 1

        if spot_price < (strike / range_factor):
            return return_multiplier if is_put else 0
        elif spot_price > (strike * range_factor):
            return 0 if is_put else -1 * return_multiplier
        elif is_put:
            return return_multiplier * ((np.sqrt((strike * range_factor) / spot_price) - 1) / (range_factor - 1))
        else:
            return return_multiplier * (((np.sqrt((strike * range_factor) / spot_price) - 1) / (range_factor - 1)) - 1)

    @staticmethod
    def strike_from_delta(
        spot_price: float,
        delta: float,
        range_factor: float,
        is_long: bool,
        is_put: bool) -> float:
        """
        Inverts the original delta(...) function. Given a desired delta,
        solves for the strike that would produce that delta, under the 
        piecewise rules.

        NOTE: When delta is exactly 0 or ±1, the 'solution' is actually 
            a range of strikes. This function returns the *boundary* 
            strike (spot_price * range_factor, or spot_price/range_factor) 
            that *just* satisfies the region transition.
        """
        if (is_long and is_put) or not (is_long or is_put):
            assert(delta >= -1 and delta <= 0)
        else:
            assert(delta >= 0 and delta <= 1)

        return_multiplier = -1 if is_long else 1
        # -------------------------------
        # 1) Handle put options
        # -------------------------------
        if is_put:
            # Region 1 (put): delta = return_multiplier => => "far ITM"
            # => spot_price < strike / range_factor => i.e. strike > spot_price * range_factor
            # We'll pick boundary: strike = spot_price * range_factor.
            if delta == return_multiplier:
                # e.g. if is_long => delta = -1 => strike >= spot_price * range_factor
                # pick boundary:
                return spot_price * range_factor

            # Region 2 (put): delta = 0 => => "far OTM"
            # => spot_price > strike * range_factor => i.e. strike < spot_price / range_factor
            # We'll pick boundary: strike = spot_price / range_factor.
            elif delta == 0:
                return spot_price / range_factor

            # Region 3 (put):  0 < |delta| < 1
            # Solve:  delta = return_multiplier * ( (sqrt(...) - 1) / (range_factor - 1) )
            # eq = delta / return_multiplier
            else:
                eq = delta / return_multiplier
                x  = 1 + eq * (range_factor - 1)   # sqrt((strike * range_factor)/spot_price)

                # Guard against negative under sqrt, or negative strike
                if x <= 0:
                    raise ValueError(
                        f"Cannot invert delta={delta} in the 'middle region' for a put. "
                        f"Computed x={x} <= 0 => no real solution."
                    )
                strike = (x**2 * spot_price) / range_factor
                return strike

        # -------------------------------
        # 2) Handle call options (not is_put)
        # -------------------------------
        else:
            # Region 1 (call): delta = 0 => => "far OTM"
            # => spot_price < strike / range_factor => => strike > spot_price * range_factor
            # We'll pick boundary: strike = spot_price * range_factor
            if delta == 0:
                return spot_price * range_factor

            # Region 2 (call): delta = -1 * return_multiplier => => "far ITM"
            # => spot_price > strike * range_factor => => strike < spot_price / range_factor
            # We'll pick boundary: strike = spot_price / range_factor
            elif delta == -1 * return_multiplier:
                return spot_price / range_factor

            # Region 3 (call): solve
            #  delta = return_multiplier * ( (sqrt(...) - 1)/(range_factor - 1) - 1 )
            eq = delta / return_multiplier
            # eq + 1 = (sqrt(...) - 1)/(range_factor - 1)
            # sqrt(...)= 1 + (eq+1)*(range_factor-1)
            x  = 1 + (eq + 1)*(range_factor - 1)
            if x <= 0:
                raise ValueError(
                    f"Cannot invert delta={delta} in the 'middle region' for a call. "
                    f"Computed x={x} <= 0 => no real solution."
                )
            strike = (x**2 * spot_price) / range_factor
            return strike

    @staticmethod
    def convert_r_to_tick_width(r: float) -> float:
        """Converts range factor to tick width
        
        :r range factor
        """
        return math.log(r, 1.0001)

    @staticmethod
    def calc_sqrtprice_change(df: pd.DataFrame) -> pd.DataFrame:
        """Stores previous sqrt price and calculates delta"""
        df['prev_sqrtPrice'] = df['sqrtPrice'].shift()
        df['change_sqrtPrice'] = df['sqrtPrice'] - df['prev_sqrtPrice']
        return df

    @staticmethod
    def unpack_sqrtprice(sqrt_p: str) -> float:
        """Unpacks sqrt price from raw UNI V3 pool data"""
        return int(sqrt_p, 16) / (2 ** 96)

    @staticmethod
    def get_twos_comp(hex_str: str, bits: int=256) -> float:
        """Calculate two's complement"""
        num = int(hex_str, 16)
        if (num & (1 << (bits - 1))) != 0: # Check if first bit is set
            num = num - (1 << bits)        # Get two's complement
        return num

    @staticmethod
    def get_pool_address(token_0: str, token_1: str, fee: int) -> str:
        """Gets Univ3 Pool Address"""
        # Source: https://info.uniswap.org/#/pools
        # Feel free to add additional pools
        # (make sure token0 and token1 are specified in the same order as on Uniswap!)
        # Ex: '[token0]/[token1] [fee]bps': '[pool_address]',
        pools = {
            '1INCH/ETH 100bps': '0xe931b03260b2854e77e8da8378a1bc017b13cb97',
            'AAVE/ETH 30bps': '0x5ab53ee1d50eef2c1dd3d5402789cd27bb52c1bb',
            'APE/ETH 30bps': '0xac4b3dacb91461209ae9d41ec517c2b9cb1b7daf',
            'BIT/ETH 30bps': '0x5c128d25a21f681e678cb050e551a895c9309945',
            'BUSD/USDC 5bps': '0x00cef0386ed94d738c8f8a74e8bfd0376926d24c',
            'cbETH/ETH 5bps': '0x840deeef2f115cf50da625f7368c24af6fe74410',
            'DAI/ETH 5bps': '0x60594a405d53811d3bc4766596efd80fd545a270',
            'DAI/ETH 30bps': '0xc2e9f25be6257c210d7adf0d4cd6e3e881ba25f8',
            'DAI/USDC 1bps': '0x5777d92f208679db4b9778590fa3cab3ac9e2168',
            'DAI/USDC 5bps': '0x6c6bc977e13df9b0de53b251522280bb72383700',
            'DAI/FRAX 5bps': '0x97e7d56a0408570ba1a7852de36350f7713906ec',
            'ETH/BTT 30bps': '0x64a078926ad9f9e88016c199017aea196e3899e1',
            'ETH/ENS 30bps': '0x92560c178ce069cc014138ed3c2f5221ba71f58a',
            'ETH/LOOKS 30bps': '0x4b5ab61593a2401b1075b90c04cbcdd3f87ce011',
            'ETH/sETH2 30bps': '0x7379e81228514a1d2a6cf7559203998e20598346',
            'ETH/USDT 5bps': '0x11b815efb8f581194ae79006d24e0d814b7697f6',
            'ETH/USDT 30bps': '0x4e68ccd3e89f51c3074ca5072bbac773960dfa36',
            'FRAX/USDC 5bps': '0xc63b0708e2f7e69cb8a1df0e1389a98c35a76d52',
            'GNO/ETH 30bps': '0xf56d08221b5942c428acc5de8f78489a97fc5599',
            'HEX/ETH 30bps': '0x9e0905249ceefffb9605e034b534544684a58be6',
            'HEX/USDC 30bps': '0x69d91b94f0aaf8e8a2586909fa77a5c2c89818d5',
            'LDO/ETH 30bps': '0xa3f558aebaecaf0e11ca4b2199cc5ed341edfd74',
            'LINK/ETH 30bps': '0xa6cc3c2531fdaa6ae1a3ca84c2855806728693e8',
            'MATIC/ETH 30bps': '0x290a6a7460b308ee3f19023d2d00de604bcf5b42',
            'MKR/ETH 30bps': '0xe8c6c9227491c0a8156a0106a0204d881bb7e531',
            'SHIB/ETH 30bps': '0x2f62f2b4c5fcd7570a709dec05d68ea19c82a9ec',
            'UNI/ETH 30bps': '0x1d42064fc4beb5f8aaf85f4617ae8b3b5b8bd801',
            'USDC/ETH 1bps': '0xe0554a476a092703abdb3ef35c80e0d76d32939f',
            'USDC/ETH 5bps': '0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640',
            'USDC/ETH 30bps': '0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8',
            'USDC/ETH 100bps': '0x7bea39867e4169dbe237d55c8242a8f2fcdcc387',
            'USDC/USDT 1bps': '0x3416cf6c708da44db2624d63ea0aaef7113527c6',
            'USDC/USDT 5bps': '0x7858e59e0c01ea06df3af3d20ac7b0003275d4bf',
            'USDC/USDM 5bps': '0x8ee3cc8e29e72e03c4ab430d7b7e08549f0c71cc',
            'WBTC/ETH 5bps': '0x4585fe77225b41b697c938b018e2ac67ac5a20c0',    
            'WBTC/ETH 30bps': '0xcbcdf9626bc03e24f779434178a73a0b4bad62ed',
            'WBTC/USDC 30bps': '0x99ac8ca7087fa4a2a1fb6357269965a2014abc35',
            'WOOF/ETH 100bps': '0x666ed8c2151f00e7e58b4d941f65a9df68d2245b',
        }

        pool_name = f"{token_0}/{token_1} {str(fee)}bps"
        try:
            pool_address = pools[pool_name]
        except KeyError:
            return None
        return pool_address

    @staticmethod
    def get_token_dec(token: str) -> int:
        """Gets number of decimals corresponding to token"""
        # Source: https://apiv5.paraswap.io/tokens/?network=1 & https://etherscan.io/tokens
        decimals = {
            '1INCH': 18,
            'AAVE': 18,
            'APE': 18,
            'BIT': 18,
            'BTT': 18,
            'BUSD': 18,
            'cbETH': 18,
            'DAI': 18,
            'ENS': 18,
            'ETH': 18,
            'FRAX': 18,
            'GNO': 18,
            'HEX': 8,
            'LDO': 18,
            'LINK': 18,
            'LOOKS': 18,
            'MATIC': 18,
            'MKR': 18,
            'sETH2': 18,
            'SHIB': 18,
            'UNI': 18,
            'USDC': 6,
            'USDM': 18,
            'USDT': 6,
            'WBTC': 8,
            'WOOF': 18,
        }
        try:
            dec = decimals[token]
        except KeyError:
            return None
        return dec

    @staticmethod
    def get_tick_spacing(fee: int) -> int:
        """
        Gets Univ3 tick spacing corresponding to fee-tier

        :fee fee-tier in bps
        """
        spacing = {
            1: 1,
            5: 10,
            30: 60,
            100: 200
        }
        try:
            space = spacing[fee]
        except KeyError:
            return None
        return spacing[fee]

## Plotting

In [3]:
def plot_hist(s: pd.Series, xlabel: str, ylabel: str, title: str, dir: str, bins=100) -> None:
    plt.figure(dpi=750)
    plt.hist(s, bins=bins)

    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(title, pad=20)

    plt.axvline(s.median(), color='w', linestyle='dashed', linewidth=1)
    min_ylim, max_ylim = plt.ylim()
    plt.text(s.median(), max_ylim*1.01, f"Median: {round(s.median(), 1)}", ha='center')

    plt.savefig(dir)
    plt.savefig(f"{dir}.svg")

def plot_hist_all(rets: dict[str, pd.Series], xlabel: str, ylabel: str, title: str, dir: str, bins=100) -> None:
    plt.figure(dpi=750)
    for name, r in rets.items():
        plt.hist(r, label=name + f', med = {round(r.median(), 1)}', bins=bins)

    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(title, pad=20)
    plt.legend(prop={'size': 2})

    plt.savefig(dir)
    plt.savefig(f"{dir}.svg")

In [4]:
def plot_cum_rets(rets: dict[str, pd.Series], xlabel: str, ylabel: str, title: str, dir: str, label: bool=False) -> None:
    colors = ['#6c74f5', '#7ceac5', '#f305f9', '#ffac6e', '#f8d120','#efefef']
    plt.figure(dpi=750)
    first_series = list(rets.values())[0]
    start_date = first_series.index[0]
    end_date = first_series.index[-1]
    date_range_days = (end_date - start_date).days
    x_offset = timedelta(days=max(2, int(date_range_days * 0.05)))

    label_positions = []

    for i, (name, r) in enumerate(rets.items()):
        cum_rets = ((1 + r).cumprod() - 1) * 100
        plt.plot(cum_rets, lw=1, color='w')
        plt.plot(cum_rets, lw=0.75, label=name, color=colors[i % len(colors)])

        if label:
            y = cum_rets[-1]
            y_offset = 0
            for prev_y in label_positions:
                if abs(y - prev_y) < 1.5:
                    y_offset += 1
            label_positions.append(y + y_offset)

            ax = plt.gca()
            x_min, x_max = ax.get_xlim()
            label_x = cum_rets.index[-1] + x_offset
            if label_x > x_max:
                label_x = x_max - (x_max - x_min) * 0.02

            plt.plot(cum_rets.index[-1], y, 'o', zorder=4, ms=2.5, mec='#efefef', color=colors[i % len(colors)])
            plt.text(label_x, y + y_offset, f"{round(y)}%", ha='left', va='center', fontsize=2.5, color=colors[i % len(colors)], clip_on=True)

    plt.xticks(rotation=45)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(title)
    plt.legend(prop={'size': 2})
    plt.xlim([start_date, end_date + x_offset * 1.5])

    plt.savefig(dir)
    plt.savefig(f"{dir}.svg")

In [5]:


def plot_summed_rets(rets: dict[str, pd.Series], xlabel: str, ylabel: str, title: str, dir: str, label: bool = True) -> None:
    colors = ['#6c74f5', '#7ceac5', '#f305f9', '#ffac6e', '#f8d120', '#efefef']
    plt.figure(dpi=750)

    first_series = list(rets.values())[0]
    start_date = first_series.index[0]
    end_date = first_series.index[-1]
    date_range_days = (end_date - start_date).days
    x_offset = timedelta(days=max(2, int(date_range_days * 0.05)))

    label_positions = []

    for i, (name, r) in enumerate(rets.items()):
        cum_rets = r.cumsum() * 100
        plt.plot(cum_rets, lw=1, color='w')
        plt.plot(cum_rets, lw=0.75, label=name, color=colors[i % len(colors)])

        if label:
            y = cum_rets[-1]
            y_offset = 0
            for prev_y in label_positions:
                if abs(y - prev_y) < 1.5:
                    y_offset += 1
            label_positions.append(y + y_offset)

            ax = plt.gca()
            x_min, x_max = ax.get_xlim()
            label_x = cum_rets.index[-1] + x_offset
            if label_x > x_max:
                label_x = x_max - (x_max - x_min) * 0.02

            plt.plot(cum_rets.index[-1], y, 'o', ms=2.5, mec='#efefef', color=colors[i % len(colors)])
            plt.text(label_x, y + y_offset, f"{round(y)}%", ha='left', va='center', fontsize=2.5, color=colors[i % len(colors)], clip_on=True)

    plt.xticks(rotation=45)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(title)
    plt.legend(prop={'size': 3})
    plt.xlim([start_date, end_date + x_offset * 1.5])

    plt.savefig(dir)
    plt.savefig(f"{dir}.svg")


In [6]:
def strat_name(strat: LP_Rebalance) -> str:
    return f"({strat.token_0}-{strat.token_1} {int(strat.fee_rate * 10_000)} bps, {strat.start_t} - {strat.end_t}, r = {strat.range_perc})"

def strat_file(strat: LP_Rebalance) -> str:
    return f"{strat.token_0}-{strat.token_1}-{int(strat.fee_rate * 10_000)}bps-{strat.start_t}-{strat.end_t}-r={int((strat.range_perc - 1) * 100)}"

def strat_label(strat: LP_Rebalance, reb_freq: str) -> str:
    return f"{strat.token_0}-{strat.token_1} {int(strat.fee_rate * 10_000)} bps (r = {int(strat.range_perc*100)}, reb = {reb_freq})"

def strat_label_2(strat: LP_Rebalance) -> str:
    return f"r = {strat.range_perc}"

In [11]:
def plot_series_as_bar(
    series: pd.Series,
    xlabel: str,
    ylabel: str,
    title: str,
    output_path: str,
    rotate_xticks: int = 45
) -> None:

    plt.figure(dpi=650)
    
    # Plot as bar
    premia = series * 100
    premia = premia[1:]
    ax = premia.plot(kind="bar", rot=rotate_xticks, width=0.8)

    # Labeling
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(title)
    
    # Annotate each bar (optional)
    for i, val in enumerate(premia):
        # val is the height of the bar
        ax.text(i, val, f"{val:.1f}", ha="center", va="bottom", fontsize=8)
    
    plt.tight_layout()  # Ensures everything fits nicely
    plt.savefig(output_path)
    plt.savefig(f"{output_path}.svg")  # Save an SVG too if desired



# Strategy Stats

In [8]:
def strat_summary(s: pd.Series) -> pd.Series:
    """Get Summary Statistics (in bps) for Strategy"""
    BPS = 10_000
    sharpe = (s.mean() / s.std()) * (365 ** 0.5)
    avg_ret = s.mean()
    med_ret = s.median()
    max_loss = s.min()
    avg_loss = s[s < 0].mean()
    med_loss = s[s < 0].median()
    num_wins = len(s[s > 0])
    win_rate = num_wins / (len(s))
    avg_gain = s[s > 0].mean()
    med_gain = s[s > 0].median()
    max_gain = s.max()

    cum_rets = ((1 + s).cumprod() - 1) * 100
    max_cum_ret = max(cum_rets)
    min_cum_ret = min(cum_rets)
    cum_ret = cum_rets[-1]

    stats = {'Sharpe Ratio': sharpe,
             'Cum Ret': cum_ret,
             'Max Cum Ret': max_cum_ret,
             'Min Cum Ret': min_cum_ret,
             'Avg Ret': avg_ret * BPS,
             'Med Ret': med_ret * BPS,
             'Max Loss': max_loss * BPS,
             'Avg Loss': avg_loss * BPS,
             'Med Loss': med_loss * BPS,
             'Win Rate': win_rate,
             'Avg Gain': avg_gain * BPS,
             'Med Gain': med_gain * BPS,
             'Max Gain': max_gain * BPS
    }

    return pd.Series(stats)