In [None]:
from collections import deque
from itertools import chain
import heapq as hq
import numpy as np

rng = np.random.default_rng(seed=42)

class Trade: 
    def __init__(self, bid_order, ask_order, price, volume):
        self.bid = bid_order
        self.ask_id = ask_order
        self.price = price
        self.volume = volume
    
    def __repr__(self):
        return f"Trade(bid_id={self.bid_order.id}, ask_id={self.ask_order.id}, price={self.price}, volume={self.volume})"
    
class Order: 
    _id_counter = 0 

    def __init__(self, price=None, volume=100, is_bid=True, trader_id=None):
        self.price = price 
        self.volume = volume 
        self.is_bid = is_bid
        self.id = Order._id_counter
        self.trader_id = trader_id

        Order._id_counter += 1
    
    def __repr__(self):
        side = "BID" if self.is_bid else "ASK"
        type_ = "MARKET" if self.is_market else "LIMIT"
        return f"Order(id={self.id}, price={self.price}, volume={self.volume}, side={side}, type={type_}, trader_id={self.trader_id})"
    
class MarketOrder(Order):
    def __init__(self, volume=100, is_bid=True, trader_id=None):
        price = float('inf') if is_bid else -float('inf')
        super().__init__(price, volume, is_bid, is_market=False, trader_id=trader_id)



In [68]:
class PriceLevel:
    def __init__(self, price):
        self.price = price 
        self.orders = deque()
    
    def add(self, order):
        self.orders.append(order)

    def top(self):
        if not self.orders:
            return None
        return self.orders[0] # O(1)

    def pop(self):
        if not self.orders:
            raise IndexError("Price level is empty.")
        return self.orders.popleft() # O(1)
    
    def fill(self, order):
        trades = []
        level_orders_filled = []
        is_bid = order.is_bid

        while not self.is_empty() and order.volume > 0: 
            top_order = self.top()
            trade_volume = 0

            if top_order.volume > order.volume: 
                trade_volume += order.volume

            elif top_order.volume == order.volume: 
                trade_volume += order.volume
                level_orders_filled.append(self.pop())
                
            else: 
                trade_volume += top_order.volume
                level_orders_filled.append(self.pop())

            # Update order volumes
            top_order.volume -= trade_volume 
            order.volume -= trade_volume 
            
            # Log trade
            bid = order if is_bid else top_order
            ask = top_order if is_bid else order
            trade = Trade(bid, ask, self.price, trade_volume)
            trades.append(trade)

        return trades, level_orders_filled
        
    def cancel(self, order):
        try:
            self.orders.remove(order) # O(n)
        except ValueError:
            raise ValueError("Order not found at this price level.")
        
    def is_empty(self):
        return True if len(self.orders) == 0 else False
    
    def __repr__(self):
        return f"PriceLevel(price={self.price}, orders={list(self.orders)})"
    
    def __str__(self):
        volume = sum(order.volume for order in self.orders)
        return f"PriceLevel: Price={self.price}, Volume={volume}, Orders={len(self.orders)}"


In [69]:
class PriceBook:
    def __init__(self, is_bid_side):
        self.order_map = {}
        self._price_levels = {}
        self._heap = []
        self.is_bid_side = is_bid_side

    def add(self, order): 
        price = order.price

        if price not in self._price_levels:
            # Add price level to heap
            self._price_levels[price] = PriceLevel(price)
            # Use negative price for max-heap behavior on bid side.
            heap_price = -price if self.is_bid_side else price
            hq.heappush(self._heap, heap_price)

        # Add order to price level 
        self._price_levels[price].add(order)
        # Add order to the order map 
        self.order_map[order.id] = order

    
    def cancel(self, order):
        price = order.price
        if price not in self._price_levels:
            raise ValueError(f"Price {price} not found in PriceBook.")

        try:
            self._price_levels[price].cancel(order)
            # Trying to remove the price level from the heap here would be O(N) so we delay clean up
        except ValueError as e:
            # Propagate the original error
            raise ValueError(f"Failed to cancel order {order.id}: {e}") from e
        
        # Remove from order map
        del self.order_map[order.id]

    def best_price(self):
        if not self._price_levels: 
            return None
        else:
            # Since I didn't remove empty price levels upon order cancellation I have to handle it here
            while self._heap:
                best_price = - self._heap[0] if self.is_bid_side else self._heap[0]
                # Remove empty price levels
                if self._price_levels[best_price].is_empty():
                    hq.heappop(self._heap)
                    del self._price_levels[best_price]
                else: 
                    break

            if self._heap:
                return best_price
            else:
                return None
        
    def fill(self, order):
        if order.is_bid == self.is_bid_side:
            raise ValueError("Cannot fill an order on the same side of the PriceBook.")
        
        price = order.price
        trades = []
        best_price = self.best_price()

        while best_price and order.volume > 0:
            
            # Check whether prices are compatible
            can_fill = (best_price >= price) if self.is_bid_side else (best_price <= price)

            if can_fill:
                trades_at_price, orders_filled = self._price_levels[best_price].fill(order)
                trades.extend(trades_at_price)
                for o in orders_filled:
                    del self.order_map[o.id]
                    
            else:
                break
            
            # This simultaneously removes empty price levels and establishes next best_price
            best_price = self.best_price()

        # Return list of trades
        return trades
    
    def __repr__(self):
        return f"PriceBook(is_bid_side={self.is_bid_side}, price_levels={sorted(list(self._price_levels.keys()))})"
    
    def display(self):
        print(f"{'BID' if self.is_bid_side else 'ASK'} PriceBook:")
        for heap_price in sorted(self._heap, reverse= not self.is_bid_side):
            price = -heap_price if self.is_bid_side else heap_price
            price_level = self._price_levels[price]
            print(price_level)

In [70]:
class Asset:
    def __init__(self, name, initial_value=100):
        self.name = name
        self.initial_value = initial_value
        self.value = initial_value

    def evolve_value(self, dift=0, sigma=0.5):
        self.value += rng.normal(dift, sigma)

    def __repr__(self):
        return f"Asset(name={self.name}, value={self.value})"
    
    def __str__(self):
        return f"Asset: {self.name}, Current Value: {self.value}, Initial Value: {self.initial_value}"
    
class TradesNotification:
    def __init__(self, trader_id, order):
        self.trader_id = trader_id
        self.order_id = order.id

        self.price_volume = {}
        self.num_trades = 0

        self._remaining_volume = order.volume
        self.is_filled = True if order.volume == 0 else False

    @property
    def average_price(self):
        weighted_sum = sum(price * volume for price, volume in self.price_volume.items())
        return weighted_sum / self.total_filled_volume 
    
    @property 
    def total_filled_volume(self):
        sum(self.price_volume.values())

    def add_trade(self, trade):
        self.price_volume[trade.price] = self.price_volume.get(trade.price, 0) + trade.volume
        self.num_trades += 1
    
    def __repr__(self):
        status = "FILLED" if self.is_filled else "PARTIAL"
        return (f"TradesNotification(order_id={self.order_id}, "
                f"filled {self.total_filled_volume}@{self.average_price:.2f} avg, "
                f"{self.num_trades} trades, remaining={self.remaining_volume}, {status})")

In [80]:
class OrderBook:

    def __init__(self):
        self.bids = PriceBook(is_bid_side=True)
        self.asks = PriceBook(is_bid_side=False)
    
    @property
    def best_bid(self):
        return self.bids.best_price()
    
    @property
    def best_ask(self):
        return self.asks.best_price()
    
    @property 
    def spread(self):
        if self.best_bid == None or self.best_ask == None:
            return None
        else: 
            return round(self.best_ask-self.best_bid, 2)
    
    @property 
    def mid_price(self):
        if self.best_bid == None or self.best_ask == None:
            return None
        else: 
            return round((self.best_ask + self.best_bid) / 2, 2)
    
    def process_orders(self, orders):
        trades = []
        for order in orders:
            if order.is_bid: 
                trades.extend(self.asks.fill(order))
            else:
                trades.extend(self.bids.fill(order))

            # Only add remaining volume to book if it is a limit order
            if order.volume and not order.is_market:
                if order.is_bid:
                    self.bids.add(order)
                else:
                    self.asks.add(order)
            
        return self._process_trades(trades)

    def _process_trades(self, trades):
        order_notifs = {}
        for trade in trades:
            bid_trader_id = trade.bid_order.trader_id
            ask_trader_id = trade.ask_order.trader_id
            if bid_trader_id is not None:
                bid_order = trade.bid_order
                order_notifs[trade.bid_order.id] = order_notifs.get(trade.bid_order.id, TradesNotification(bid_trader_id, bid_order)).add_trade(trade)
            if ask_trader_id is not None:
                ask_order = trade.ask_order
                order_notifs[trade.ask_order.id] = order_notifs.get(trade.ask_order.id, TradesNotification(ask_trader_id, ask_order)).add_trade(trade)
        
        return order_notifs
    
    def unfilled_orders(self, trader_id):
        unfilled_asks = self.asks.order_map.values()
        unfilled_bids = self.bids.order_map.values()
        unfilled_orders = [(order.id, order.price, order.volume) for order in chain(unfilled_asks, unfilled_bids)]
        return
       
    # A helper function to reset the trading volumes at each timestep
    def reset_volumes(self):
        self.buy_volume = 0 
        self.sell_volume = 0

    def cancel(self, order_id):
        if order_id in self.bids.order_map:
            order = self.bids.order_map[order_id]
            self.bids.cancel(order)
        elif order_id in self.asks.order_map:
            order = self.asks.order_map[order_id]
            self.asks.cancel(order)
        else:
            raise KeyError(f"Order with id {order_id} could not be found in either side of the order book.")
    
    def __repr__(self):
        return f"OrderBook(bids={self.bids.__repr__()}, asks={self.asks.__repr__()})"
    
    def display(self):
        self.bids.display()
        self.asks.display()

In [None]:
### sanity check 
ob = OrderBook()

o1 = Order(100, 150, True, False, 1)
o2 = Order(120, 100, False, False, 1)
ob.process_orders([o1,o2])
ob.display()

m1 = Order()

BID PriceBook:
PriceLevel: Price=100, Volume=150, Orders=1
ASK PriceBook:
PriceLevel: Price=120, Volume=100, Orders=1
