In [2]:
# A constant product Automated Market Maker, like that of Uniswap v1
# Ensures pool can always accommodate trades, albeit with slippage for larger trades
# Does not feature fees or liquidity provider (LP) tokens

class V1_AMM: 
    def __init__(self, a_quantity, b_quantity):
        self.a_quantity = a_quantity
        self.b_quantity = b_quantity
        self.constant_product = self.a_quantity * self.b_quantity
        print('Created a V1_AMM pool with {} of asset A and {} of asset B\n'.format(self.a_quantity, self.b_quantity))
        # allocate initial LP tokens proportional to sqrt constant prod
        # self.total_pool_lp_tokens = ...
        # self.own_lp_tokens = ...
    
    def trade_a(self, a_amount):
        if a_amount > self.a_quantity:
            print('Amount {} of asset A too large to trade'.format(a_amount))
            return
        initial_ratio_b_on_a = self.b_quantity / self.a_quantity
        self.a_quantity += a_amount
        old_b_quantity = self.b_quantity
        self.b_quantity = self.constant_product / self.a_quantity # update quantity using constant prod
        b_amount_returned = old_b_quantity - self.b_quantity
        b_amount_expected_at_initial_ratio = initial_ratio_b_on_a * a_amount
        slippage = b_amount_expected_at_initial_ratio - b_amount_returned
        print('Got {} B, expected {} B from {} A, slippage of {} B or {}%'.format(b_amount_returned, b_amount_expected_at_initial_ratio, a_amount, f"{slippage:.4g}", (slippage/b_amount_expected_at_initial_ratio) * 100)) # shows slippage

    def trade_b(self, b_amount):
        if b_amount > self.b_quantity:
            print('Amount {} of asset B too large to trade'.format(b_amount))
            return
        initial_ratio_a_on_b = self.a_quantity / self.b_quantity
        self.b_quantity += b_amount
        old_a_quantity = self.a_quantity
        self.a_quantity = self.constant_product / self.b_quantity # update quantity using constant prod
        a_amount_returned = old_a_quantity - self.a_quantity
        a_amount_expected_at_initial_ratio = initial_ratio_a_on_b * b_amount
        slippage = a_amount_expected_at_initial_ratio - a_amount_returned
        print('Got {} A from {} B, expected {} A, slippage of {} A or {}%'.format(a_amount_returned, b_amount, a_amount_expected_at_initial_ratio, f"{slippage:.4g}", (slippage/a_amount_expected_at_initial_ratio) * 100)) # shows slippage

    def log_state(self):
        print('Quantity A: {}   Quantity B: {}\n'.format(self.a_quantity, self.b_quantity))

    def provide_liquidity(self, a_amount, b_amount): # returns lp tokens
        # ratio of a_amount / b_amount must match current pool ratio
        # TODO
        pass

    

In [3]:
# simple V1_AMM demo (no fees, no lp tokens)

my_amm = V1_AMM(a_quantity=10000, b_quantity=10000)

my_amm.trade_b(500)

# TODO demo impermanent loss

Created a V1_AMM pool with 10000 of asset A and 10000 of asset B

Got 476.19047619047706 A from 500 B, expected 500.0 A, slippage of 23.81 A or 4.761904761904589%


In [9]:
# A constant product Automated Market Maker with concentrated liquidity, like that of Uniswap v3
# logic simplified from https://uniswap.org/whitepaper-v3.pdf (only supports one concentrated liquidity position, no fees, no NFT representing unique LP position, no oracle...)
# 'ticks' represent discrete prices of width 1 basis point

from math import sqrt, log, floor
from collections import defaultdict
    
class Simple_V3_AMM:
    # initializes v3 amm, creates one position
    def __init__(self, initial_sqrt_price):
        ## Global State (L, root(P), curr_tick, tick_bitmap)
        self.liquidity = 0
        self.sqrt_price = initial_sqrt_price
        self.curr_tick = None
        self.tick_bitmap = defaultdict(lambda: 0) # tick bitmap

        ## Tick-indexed State: tick_index => {l_net, l_gross}
        self.ticks = defaultdict(lambda: None)

        ## Position-indexed State: (username, tick_low, tick_high) => {"liquidity": val}
        self.positions = defaultdict(lambda: None)
        print('Created a Simple_V3_AMM pool')

    # computes liquidity received for a given d_a and price range
    def get_dl_for_da(self, d_a, sqrt_price_low, sqrt_price_high):
        if (sqrt_price_low > sqrt_price_high): sqrt_price_low, sqrt_price_high = sqrt_price_high, sqrt_price_low # safety
        return d_a * ((sqrt_price_high * sqrt_price_low)/(sqrt_price_high - sqrt_price_low))
        
    # computes liquidity received for a given d_b and price range
    def get_dl_for_db(self, d_b, sqrt_price_low, sqrt_price_high):
        if (sqrt_price_low > sqrt_price_high): sqrt_price_low, sqrt_price_high = sqrt_price_high, sqrt_price_low # safety
        return d_b / (sqrt_price_high - sqrt_price_low)
    
    # calculates position liquidity given d_a, d_b, and a price range, assumes current price within price range
    # adds position to global state
    def add_position(self, d_a, d_b, sqrt_price_low, sqrt_price_high): 
        if (sqrt_price_low > sqrt_price_high): sqrt_price_low, sqrt_price_high = sqrt_price_high, sqrt_price_low # safety
        lower_tick = floor(log(sqrt_price_low, sqrt(1.0001)))
        upper_tick = floor(log(sqrt_price_high, sqrt(1.0001)))

        self.tick_bitmap[lower_tick] = 1 # mark tick endpoints as initialized
        self.tick_bitmap[upper_tick] = 1

        dl_a = self.get_dl_for_da(d_a, self.sqrt_price, sqrt_price_high)
        dl_b = self.get_dl_for_db(d_b, sqrt_price_low, self.sqrt_price)
        d_l = min(dl_a, dl_b) # liquidity in position, min

        # update state (assumes price within price range)
        self.sqrt_price = sqrt(d_b / d_a)
        self.liquidity = d_l
        self.curr_tick = floor(log(self.sqrt_price, sqrt(1.0001)))
        self.ticks[lower_tick] = {"l_net": d_l, "l_gross": "??"} # gross liquidity is used to uninitialize ticks when no positions refer to them, won't matter for our simple demo
        self.ticks[upper_tick] = {"l_net": -d_l, "l_gross": "??"}
        self.positions[("SampleUser", lower_tick, upper_tick)] = {"liquidity": d_l}

    # assumes only one concentrated liquidity position is active (i.e. doesn't cross activated ticks (yet))
    def trade_b(self, d_b):
        # find next initialized tick (tick crossing not implemented yet)
        next_initialized_tick = self.curr_tick
        while self.tick_bitmap[next_initialized_tick] == 0:
            next_initialized_tick += 1
        # price change must not move over next_initialized_tick implied price, # each tick has d_L which will act on self.liquidity when the tick is crossed (tick crossing not implemented yet)
        d_sqrt_price = d_b / self.liquidity
        new_sqrt_price = self.sqrt_price + d_sqrt_price
        new_curr_tick = floor(log(new_sqrt_price, sqrt(1.0001)))
        if new_curr_tick > next_initialized_tick:
            print('[ERROR] Trade moved price too high, from {} to {}'.format(self.sqrt_price**2, new_sqrt_price**2)) # debug
            return
        d_inv_sqrt_price = (1/new_sqrt_price) - (1/self.sqrt_price)
        d_a = d_inv_sqrt_price * self.liquidity
        a_amount_expected_at_initial_ratio = 1/(self.sqrt_price**2) * d_b
        slippage = a_amount_expected_at_initial_ratio - (-1 * d_a)
        print('Got {} A from {} B, expected {} A, slippage of {} A or {}%'.format(-1 * d_a, d_b, a_amount_expected_at_initial_ratio, f"{slippage:.4g}", (slippage/a_amount_expected_at_initial_ratio) * 100)) # shows slippage
        self.sqrt_price += d_sqrt_price # update global price
        return -1 * d_a # d_a will be the amount of A lost by the increase in B
    
    def log_state(self):
        print("liquidity: " + str(self.liquidity))
        print("price: " + str(self.sqrt_price**2))

In [10]:
# simple V3_AMM demo (one position, no fees)

amm = Simple_V3_AMM(initial_sqrt_price=1)

# begin with 10000 of each asset in a narrow position
amm.add_position(d_a=10000, d_b=10000, sqrt_price_low=sqrt(0.5), sqrt_price_high=sqrt(1.5))

# Swap 500 units of asset B for asset A
amm.trade_b(d_b=500)
print('-----')

Created a Simple_V3_AMM pool
Got 492.78335485099626 A from 500 B, expected 500.0 A, slippage of 7.217 A or 1.4433290298007477%
-----
