In [17]:
'''
Web Socket sequence.

1. Fetch initial order book with REST API
    Store the order book locally
2. Fetch incremental updates with WebSocket (starting with the lastUpdateId fetched from the REST API)
    Check if the lastUpdateId matches the previous one
3. Update the local order book.

4. If WebSocket disconnects or misses some data (lastupdateid is for control):
    - Udate the local order book with REST API
    - continue with WebSocket
'''

'\nWeb Socket sequence.\n\n1. Fetch initial order book with REST API\n    Store the order book locally\n2. Fetch incremental updates with WebSocket (starting with the lastUpdateId fetched from the REST API)\n    Check if the lastUpdateId matches the previous one\n3. Update the local order book.\n\n4. If WebSocket disconnects or misses some data (lastupdateid is for control):\n    - Udate the local order book with REST API\n    - continue with WebSocket\n'

In [19]:
# Import required libraries
from sqlalchemy import Column, Float, String, DateTime, Integer, create_engine, UniqueConstraint
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker

# Required imports
import ccxt
import time
from datetime import datetime

from rich.console import Console
from rich.table import Table
from rich.style import Style
from IPython.display import clear_output

import websocket
import json
import threading

In [21]:
# Cell 1: DB manager

# Create base class for declarative models
Base = declarative_base()

class Density(Base):
    """SQLAlchemy model for storing order book densities"""
    __tablename__ = 'densities'
    
    pair = Column(String, primary_key=True, nullable=False)
    price = Column(Float, primary_key=True, nullable=False)
    side = Column(String, nullable=False)
    size = Column(Float, nullable=False)
    worth = Column(Float, nullable=False)
    spread_price = Column(Float, nullable=False)
    timestamp = Column(DateTime, default=datetime.utcnow)
    
    __table_args__ = (
        UniqueConstraint('pair', 'price', name='uix_pair_price'),
    )
    
    def __repr__(self):
        return f"<Density(pair={self.pair}, side={self.side}, price={self.price}, "
        f"size={self.size}, worth={self.worth}, spread_price={self.spread_price}, time={self.timestamp})>"

class DatabaseManager:
    def __init__(self, session_factory):
        self.Session = session_factory

    def fetch_densities(self, pair=None):
        """Fetch all densities for a specific pair."""
        session = self.Session()
        try:
            condition = Density.pair == pair if pair else True
            densities = session.query(Density).filter(condition).order_by(Density.pair.asc(), Density.price.desc()).all()
            # densities = query.order_by(Density.pair.asc(), Density.price.desc()).all()
            return densities
        except Exception as e:
            print(f"Error fetching densities: {e}")
            return []
        finally:
            session.close()

    def update_density(self, density, side=None, size=None, worth=None, spread_price=None):
        """Update an existing density."""
        session = self.Session()
        existing_density = session.query(Density).filter_by(pair=density.pair, side=density.side, price=density.price).first()
        try:
            if side is not None:
                existing_density.side = side
            if size is not None:
                existing_density.size = size
            if worth is not None:
                existing_density.worth = worth
            if spread_price is not None:
                existing_density.spread_price = spread_price
            session.commit()
        except Exception as e:
            session.rollback()
            print(f"Error updating density: {e}")
        finally:
            session.close()

    def delete_density(self, density):
        """Delete a density."""
        session = self.Session()
        try:
            session.delete(density)
            session.commit()
        except Exception as e:
            session.rollback()
            print(f"Error deleting density: {e}")
        finally:
            session.close()

    def add_density(self, pair, side, price, size, worth, spread_price):
        """Add a new density to the database."""
        session = self.Session()
        try:
            density = Density(
                pair=pair,
                side=side,
                price=price,
                size=size,
                worth=worth,
                spread_price=spread_price,
                timestamp=datetime.utcnow()
            )
            session.add(density)
            session.commit()
        except Exception as e:
            session.rollback()
            print(f"Error adding density: {e}")
        finally:
            session.close()


In [23]:
# Create db_manager

engine = create_engine("sqlite:///densities_ws.db")  # Replace with your actual database URI
Session = sessionmaker(bind=engine)
db_manager = DatabaseManager(session_factory=Session)

In [13]:
import websocket
import json
import threading
import time

In [55]:
class BinanceWebSocketManager:
    def __init__(self, symbols, depth_level=100):
        """
        Initialize the Binance WebSocket Manager.
        :param symbols: List of symbols to subscribe to.
        :param depth_level: Number of levels of order book depth to store.
        """
        self.exchange = ccxt.binance({
            #'enableRateLimit': True,  # Enable rate limiting
        })
        self.symbols = symbols
        self.depth_level = depth_level
        self.url = "wss://stream.binance.com:9443/ws"
        self.connection = None

        # Store full order books and their last update ID
        self.order_books = {symbol: {'bids': [], 'asks': [], 'lastUpdateId': None} for symbol in self.symbols}
        self.thread = None
        self.running = False

    def initialize_order_book(self, symbol):
        """Fetch the initial full order book via REST API."""
        try:
            # Fetch the full order book
            order_book = self.exchange.fetch_order_book(symbol, limit=1000)
            last_update_id = order_book.get("lastUpdateId")
            
            # Store the initial order book and last update ID
            self.order_books[symbol] = {
                "bids": order_book["bids"],
                "asks": order_book["asks"],
                "lastUpdateId": last_update_id,
            }
            print(f"Initialized order book for {symbol} with lastUpdateId {last_update_id}.")
        except Exception as e:
            print(f"Error initializing order book for {symbol}: {e}")
    
    def on_message(self, ws, message):
        """Process incoming WebSocket messages."""
        try:
            data = json.loads(message)
            if 's' in data:  # Check for valid symbol
                symbol = data['s'].lower()
                if symbol not in self.order_books:
                    return

                # Extract the update fields
                last_update_id = self.order_books[symbol]["lastUpdateId"]
                U = data["U"]  # First update ID in the stream
                u = data["u"]  # Final update ID in the stream
                b = data["b"]  # Bids updates
                a = data["a"]  # Asks updates

                # Skip updates until the WebSocket syncs with the REST API data
                if last_update_id and u <= last_update_id:
                    return
                if last_update_id and U > last_update_id + 1:
                    print(f"Missed updates for {symbol}. Reinitializing order book.")
                    self.initialize_order_book(symbol)
                    return
                
                # Apply updates to the local order book
                self.update_order_book(symbol, b, a)
                self.order_books[symbol]["lastUpdateId"] = u
                print(f"Updated order book for {symbol} up to {u}.")

            else:
                print(f"Unknown message format: {data}")
        except Exception as e:
            print(f"Error processing message: {e}")

    def update_order_book(self, symbol, bids, asks):
        """Apply incremental updates to the local order book."""
        order_book = self.order_books[symbol]

        # Update bids
        for price, size in bids:
            price = float(price)
            size = float(size)
            if size == 0:
                # Remove the price level
                order_book["bids"] = [level for level in order_book["bids"] if level[0] != price]
            else:
                # Update or add the price level
                for i, (p, s) in enumerate(order_book["bids"]):
                    if p == price:
                        order_book["bids"][i] = [price, size]
                        break
                else:
                    order_book["bids"].append([price, size])
                    order_book["bids"].sort(reverse=True)  # Keep bids sorted in descending order

        # Update asks
        for price, size in asks:
            price = float(price)
            size = float(size)
            if size == 0:
                # Remove the price level
                order_book["asks"] = [level for level in order_book["asks"] if level[0] != price]
            else:
                # Update or add the price level
                for i, (p, s) in enumerate(order_book["asks"]):
                    if p == price:
                        order_book["asks"][i] = [price, size]
                        break
                else:
                    order_book["asks"].append([price, size])
                    order_book["asks"].sort()  # Keep asks sorted in ascending order

    
    def on_error(self, ws, error):
        """Handle errors."""
        print(f"WebSocket error: {error}")

    def on_close(self, ws, close_status_code, close_msg):
        """Handle WebSocket closure."""
        print("WebSocket closed")

    def on_open(self, ws):
        """Subscribe to order book streams for all symbols."""
        try:
            params = [f"{symbol.lower()}@depth@100ms" for symbol in self.symbols]
            subscribe_message = {
                "method": "SUBSCRIBE",
                "params": params,
                "id": 1
            }
            ws.send(json.dumps(subscribe_message))
            print("Subscribed to WebSocket streams!")
        except Exception as e:
            print(f"Error during subscription: {e}")

    def start(self):
        """Start the WebSocket connection."""
        if self.running:
            print("WebSocket is already running.")
            return

        for symbol in self.symbols:
            self.initialize_order_book(symbol)
        self.connection = websocket.WebSocketApp(
            self.url,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close,
            on_open=self.on_open
        )        
        # Run the WebSocket in a separate thread
        self.thread = threading.Thread(target=self.connection.run_forever, daemon=True)
        self.thread.start()
        self.running = True
        print("WebSocket started.")

    def stop(self):
        """Stop the WebSocket connection."""
        if not self.running:
            print("WebSocket is not running.")
            return

        if self.connection:
            self.connection.close()
        if self.thread:
            self.thread.join()
        self.running = False
        print("WebSocket stopped.")

    def get_order_book(self, symbol):
        """Fetch the latest order book for a symbol."""
        return self.order_books.get(symbol, None)

In [57]:
# Example usage
symbols = [ 'ADAUSDT'] # 'BTCUSDT', 'ETHUSDT',
websocket_manager = BinanceWebSocketManager(symbols, depth_level=100)
websocket_manager.start()

time.sleep(7)

# Fetch order book for BTCUSDT
for symbol in symbols:
    print(f"{symbol}: {websocket_manager.get_order_book(symbol)}\n")
    time.sleep(1)

# Stop the WebSocket manager
websocket_manager.stop()

Initialized order book for ADAUSDT with lastUpdateId None.
WebSocket started.
Subscribed to WebSocket streams!
Unknown message format: {'result': None, 'id': 1}
ADAUSDT: {'bids': [[0.9233, 10632.3], [0.9232, 11153.3], [0.9231, 25400.2], [0.923, 15242.4], [0.9229, 25549.1], [0.9228, 27592.6], [0.9227, 59400.2], [0.9226, 26021.8], [0.9225, 17882.7], [0.9224, 29446.5], [0.9223, 30622.9], [0.9222, 32956.0], [0.9221, 31665.9], [0.922, 57101.0], [0.9219, 27430.8], [0.9218, 62033.2], [0.9217, 79437.1], [0.9216, 67191.8], [0.9215, 20073.2], [0.9214, 50328.7], [0.9213, 26722.6], [0.9212, 22655.6], [0.9211, 25946.6], [0.921, 27343.8], [0.9209, 26189.9], [0.9208, 5343.9], [0.9207, 9192.4], [0.9206, 28487.2], [0.9205, 36752.0], [0.9204, 30331.6], [0.9203, 19920.5], [0.9202, 12142.1], [0.9201, 11418.9], [0.92, 11432.2], [0.9199, 10500.5], [0.9198, 7112.1], [0.9197, 3539.3], [0.9196, 1711.2], [0.9195, 18033.4], [0.9194, 6611.7], [0.9193, 5122.5], [0.9192, 1451.3], [0.9191, 1527.9], [0.919, 15564.7],

In [11]:
websocket_manager.stop()

WebSocket stopped.


In [137]:
# Cell 3 (Main Tracking Class):

class BinanceDensityTracker:
    def __init__(self, exchange, pairs, depth_level=100, value_thresholds, price_range_threshold, db_manager):
        """
        Initialize BinanceDensityTracker.

        Args:
            exchange: CCXT exchange instance.
            pairs: List of trading pairs to track (e.g., ['BTC/USDT', 'ETH/USDT']).
            depth_level: Depth of the order book to fetch.
            value_thresholds:
            price_range_threshold:
            db_manager: object to work with the database
        """
        self.exchange = exchange
        self.pairs = pairs
        self.depth_level = depth_level
        self.value_thresholds = value_thresholds
        self.price_range_threshold = price_range_threshold
        self.db_manager = db_manager

        # Initialize WebSocket manager for real-time order books
        self.ws_manager = BinanceWebSocketManager(
            symbols=[pair.replace("/", "").lower() for pair in pairs],
            depth_level=depth_level
        )

        # Cache for storing initial and updated order books
        self.order_books = {}

    def _initial_fetch(self):
        """
        Perform an initial fetch of all order books via REST API.
        """
        print("Fetching initial order books via REST API...")
        for pair in self.pairs:
            try:
                order_book = self.exchange.fetch_order_book(pair, limit=self.depth_level)
                self.order_books[pair] = order_book
                print(f"Fetched {pair}: {len(order_book['bids'])} bids, {len(order_book['asks'])} asks")
            except Exception as e:
                print(f"Error fetching order book for {pair}: {e}")
    
    
    def _process_pair_density(self, pair):
        """
        Process densities for a specific trading pair.

        Args:
            pair: The trading pair (e.g., 'BTC/USDT').
        """
        value_threshold = self.value_thresholds.get(pair.split('/')[0], 
                                                    self.value_thresholds['default'])

        # Step 1: Fetch order book
        print(f"calling {pair}")
        order_book = self.exchange.fetch_order_book(pair, limit=1000)
        asks = order_book['asks']
        bids = order_book['bids']

        max_ask_price = asks[-1][0] if asks else float('inf')
        min_ask_price = asks[0][0] if asks else 0
        max_bid_price = bids[0][0] if bids else float('inf')
        min_bid_price = bids[-1][0] if bids else 0
        # print(f"max_ask: {max_ask_price}, min_ask: {min_ask_price}, max_bid: {max_bid_price}, min_bid: {min_bid_price}")

        # Step 2: Identify new densities in the order book
        new_densities = []
        for price, size in asks:
            worth = price * size
            if worth > value_threshold:
                new_densities.append({
                    "pair": pair,
                    "side": 'Ask',
                    "price": price,
                    "size": size,
                    "worth": worth,
                    "spread_price": min_ask_price
                })

        for price, size in bids:
            worth = price * size
            if worth > value_threshold:
                new_densities.append({
                    "pair": pair,
                    "side": 'Bid',
                    "price": price,
                    "size": size,
                    "worth": worth,
                    "spread_price": max_bid_price
                })

        # Step 3: Fetch old densities for the pair from DB
        old_densities = self.db_manager.fetch_densities(pair)

        # Step 4: Compare old densities with new densities
        for old_density in old_densities:
            old_price = old_density.price
            old_side = old_density.side
            old_worth = old_density.worth
            old_spread_price = old_density.spread_price
            
            # (a) Update existing densities
            matching_new_density = next(
                (d for d in new_densities 
                 if d["price"] == old_price and d["side"] == old_side), 
                None
            )
            if matching_new_density:
                self.db_manager.update_density(
                    old_density,
                    matching_new_density["side"],
                    matching_new_density["size"],
                    matching_new_density["worth"],
                    matching_new_density["spread_price"]
                )
                new_densities.remove(matching_new_density)
                continue

            # (b) Keep old densities in the price range
            in_price_range = (
                (old_price > max_ask_price and old_price < min_ask_price * (1 + self.price_range_threshold) and old_side == 'Ask') or
                (old_price < min_bid_price and old_price > max_bid_price * (1 - self.price_range_threshold) and old_side == 'Bid')
            )
            if in_price_range and old_worth >= value_threshold:
                new_spread_price = min_ask_price if old_side == 'Ask' else max_bid_price
                # print(f"Updating spread_price from {old_density.spread_price} to {new_spread_price} for {old_density.pair}")
                self.db_manager.update_density(
                    old_density,
                    size=old_density.size,
                    worth=old_density.worth,
                    spread_price=new_spread_price
                )
                continue

            # (c) Remove densities not matching any condition
            self.db_manager.delete_density(old_density)

        # Add new densities that didn't match any old density
        for new_density in new_densities:
            self.db_manager.add_density(
                pair=new_density["pair"],
                side=new_density["side"],
                price=new_density["price"],
                size=new_density["size"],
                worth=new_density["worth"],
                spread_price=new_density["spread_price"]
            )

        # session.close()

    def display_all_densities(self):
        """Display all densities in a table."""

        # Define background styles for highlighting
        ask_highlight_style = Style(bgcolor="#ffe4e1")  # Light red background for "Ask"
        bid_highlight_style = Style(bgcolor="#f0fff0")  # Light green background for "Bid"

        # session = self.db_manager.Session()
        try:
            # densities = session.query(Density).order_by(Density.pair.asc(), Density.price.desc()).all()
            densities = self.db_manager.fetch_densities()
            # Create a Rich table
            table = Table(title="Big Order Densities", show_header=True, header_style="bold magenta")
            
            # Add columns to the table
            table.add_column("Pair", justify="center")
            table.add_column("Side", justify="center")
            table.add_column("Price", justify="right")
            table.add_column("Size", justify="right")
            table.add_column("Worth ($)", justify="right")
            table.add_column("Distance (%)", justify="right")
            table.add_column("Detected", justify="right")
            
            # Add rows to the table
            for density in densities:
                # Calculate the relative time for the "detected" column
                time_difference = datetime.utcnow() - density.timestamp
                total_minutes = int(time_difference.total_seconds() // 60)
                hours, minutes = divmod(total_minutes, 60)
                
                if hours > 0:
                    relative_time = f"{hours}h {minutes}m" if minutes > 0 else f"{hours}h"
                else:
                    relative_time = f"{minutes}m" if minutes > 0 else "now"
            
                row_style = None
                # Highlight rows that are older than 30 minutes
                highlight = total_minutes >= 30
                if highlight:
                    row_style = ask_highlight_style if density.side == "Ask" else bid_highlight_style

                distance = (density.price - density.spread_price) / density.spread_price * 100
                # Add row to the table with appropriate styles
                table.add_row(
                    density.pair,  # Pair
                    density.side,  # Side
                    f"{density.price:,.4f}",  # Price
                    f"{density.size:,.0f}",  # Size
                    f"{density.worth:,.0f}",  # Worth
                    f"{distance:,.2f}%",  # Distance
                    relative_time,  # Relative Detected time
                    style=row_style,
                )
            
            # Display the Rich table
            console.print(table)
        except Exception as e:
            print(f"Error displaying densities: {e}")
        finally:
            # session.close()
            pass

    def run(self, pairs):
        """Process densities for all pairs and display results."""
        while True:  # Infinite loop to fetch data repeatedly
    
            # Clear previous output in the notebook cell
            clear_output(wait=True)
            time.sleep(1)
            # Step 5: Display all densities           
            self.display_all_densities()
            for pair in pairs:
                self._process_pair_density(pair)
                
            time.sleep(15)


In [129]:
# Cell 2 (Configuration):

# Initialize the Binance exchange instance
exchange = ccxt.binance({
    #'enableRateLimit': True,  # Enable rate limiting
})

# Create a console instance
console = Console()

In [139]:
# Configuration
symbols = ['ADA', 'AAVE', 'ALGO', 'AVAX', 'ACX', 'BONK', 'BNB', 
           'BTC', 'DYDX', 'ETH', 'DOGE', 'FET', 'FIL', 'GRT', 'HBAR', 'ICP', 'INJ',
           'LINK', 'MINA',  'PEOPLE', 'RENDER',
           'SHIB', 'SOL', 'TON', 'VET', 'WLD', 'XAI', 'XRP', 'XLM']

# 'ONDO',
pairs = [f"{symbol}/USDT" for symbol in symbols]

price_range_threshold = 0.10  # 10% threshold
value_thresholds = {
    'default': 300000,
    'BNB': 1000000,
    'DOGE': 1000000,
    'ETH': 1000000,
    'XRP': 1000000,
    'SOL': 1000000,
    'BTC': 1000000,
    'FIL': 200000
}

In [141]:
# Cell 4: create tracker object

# Create and start the tracker
tracker = BinanceDensityTracker(
    exchange=exchange,
    db_manager=db_manager,
    value_thresholds=value_thresholds,
    price_range_threshold=price_range_threshold
)

In [143]:
# Track densities

tracker.run(pairs)

calling ADA/USDT
calling AAVE/USDT
calling ALGO/USDT
calling AVAX/USDT
calling ACX/USDT
calling BONK/USDT
calling BNB/USDT
calling BTC/USDT
calling DYDX/USDT
calling ETH/USDT
calling DOGE/USDT
calling FET/USDT
calling FIL/USDT
calling GRT/USDT
calling HBAR/USDT
calling ICP/USDT
calling INJ/USDT
calling LINK/USDT
calling MINA/USDT
calling PEOPLE/USDT
calling RENDER/USDT
calling SHIB/USDT
calling SOL/USDT
calling TON/USDT
calling VET/USDT
calling WLD/USDT


KeyboardInterrupt: 