In [1003]:
import pprint
import json
import requests
import pandas as pd
from websocket import WebSocketApp

In [1004]:
info = {}
cols = ['price', 'quantity']

In [1005]:
def init_info(ticker: str) -> None:
    global info
    
    info[ticker] = {
        'asks': None,
        'bids': None,        
        'event_id': 0,
        'lastUpdateId' : 0,        
        'u_anterior': 0
        }

In [1006]:
def validate_snapshot(msg: dict) -> bool:
    if msg.get('lastUpdateId', False) and msg.get('asks', False) and msg.get('bids', False):
        if len(msg['asks']) == 1_000 and len(msg['bids']) == 1_000:
            return True
    
    return False

In [1007]:
def create_dataframe(data:dict) -> pd.DataFrame:
    df = pd.DataFrame(data, columns=cols)
    df.set_index('price', inplace=True)
    df.sort_index(ascending=False, inplace=True)
    df.index = df.index.astype('float64')
    df['quantity'] = df['quantity'].astype("float64")
    return df

In [1008]:
def get_snapshot(ticker: str) -> None:
    global info
    url = "https://fapi.binance.com/fapi/v1/depth"
    params = {"symbol": ticker.upper(), "limit": 1000}
    response = requests.get(url, params=params)
    response = response.json()
    
    if not validate_snapshot(response):
        raise ValueError("No se pudo Obtener el Snapshot Correctamente")

    info[ticker]['lastUpdateId'] =  response['lastUpdateId']
    info[ticker]['asks'] = create_dataframe(response['asks'])
    info[ticker]['bids'] = create_dataframe(response['bids'])
    

In [1009]:
def create_event(data:dict) -> dict:
    
    data = data.get("data", {})
    u = data.get('u',0)
    U = data.get('U',0)
    pu = data.get('pu',0)
    asks = create_dataframe(data.get('a',[]))
    bids = create_dataframe(data.get('b',[]))
    
    return {
        'u': u,
        'U': U,
        'pu': pu,
        'asks': asks,
        'bids': bids
    }

In [1010]:
def on_close(ws):
    ws.close()

In [1011]:
# esta funcion sera la encargada de actualizar el order book del snapshot obtenido
def process_event(ticker: str, new_asks:dict, new_bids:dict):
    
    # datos del snapshot
    asks_df = info[ticker]['asks']
    bids_df = info[ticker]['bids']

    #print(asks_df)
    # datos a actualizar
    new_asks_df = new_asks
    new_bids_df = new_bids

    
    print(f"{'+'*50}")
    print("ACTUALIZANDO ORDER BOOK")
    print(f"{'+'*50}")

    # Actualizamos los datos que están en ambos DataFrames, con esto se modifica asks_df y bids_df
    asks_df.update(new_asks_df)
    #print(asks_df)
    bids_df.update(new_bids_df)
    #print(bids_df)
    # Ahora pasamos los datos que no están en asks_df ni en bids_df pero si en los nuevos
    # Empezamos con asks para ver el proceso paso a paso

    # Combinamos los datos, tomando asks_df como los datos que permaneces y agregando los de new_asks_df
    asks_df = asks_df.combine_first(new_asks_df)
    
    # Obtenemos el DataFrame con la condición para obtener todos los precios con cantidad cero
    condition =  asks_df[asks_df['quantity'] == 0]    

    # Usamos el método drop, usando la condición y, a esta, obtenemos el indice para poder eliminar
    asks_df = asks_df.drop(condition.index)
    
    # Hacemos lo mismo para los bids
    bids_df = bids_df.combine_first(new_bids_df)
    condition =  bids_df[bids_df['quantity'] == 0]
    bids_df = bids_df.drop(condition.index)

    info[ticker]['asks'] = asks_df
    info[ticker]['bids'] = bids_df

    # Hasta aqui tenemos la actualización constante del Order Book


In [1012]:
def on_message(ws, msg, ticker):
    global info
    # aqui llega el mensaje como string
    # lo pasamos a diccionairo
    # Hata aqui tenemos el diccionario de la info
    data = json.loads(msg) 
    
    # Obtenemos datos del evento
    pu_actual = data['data']['pu']
    u_actual = data['data']['u']
    U = data['data']['U']
    
    new_asks = create_dataframe(data['data']['a'])
    new_bids = create_dataframe(data['data']['b'])
    u_anterior = info[ticker]['u_anterior']
    asks = info[ticker]['asks']
    bids = info[ticker]['bids']
    
    
    # Obtener Snapshot siempre que (pu actual != a anterior) y reiniciamos event_id
    #print("u actual: ", u_actual)
    #print("pu_actual: ",pu_actual)
    #print("u_atnerior: ",u_anterior)
    #print(f"{'+'*50}")

    if pu_actual != u_anterior:
        get_snapshot(ticker)        
        info[ticker]['event_id'] = 0        
        print("Se obtuvo Snapshot ya que pu != u_anterior")
        info[ticker]['u_anterior'] = u_actual
    
    else:
        # Para procesar el evento tiene que ser u>= lastUpdateId
        lastUpdateId = info[ticker]['lastUpdateId']
        #print("Verificando si el evento es procesable")
        if u_actual >= lastUpdateId:            
            # El primer dato a procesar debe tener U <= lastUpdateId AND u >= lastUpdateId
            # Solo aumentaremos el número del evento por primera vez aqui
            if info[ticker]['event_id'] == 0 and U<=lastUpdateId and u_actual>= lastUpdateId:
                #print("Se procesa evento: ", info[ticker]['event_id'])
                info[ticker]['u_anterior'] = u_actual
                info[ticker]['event_id'] += 1
                process_event(ticker, new_asks, new_bids)
                

            # Este if solo entrará cuando el evento sea mayor a 0, es decir, hasta el segundo evento
            elif info[ticker]['event_id'] > 0:
                #print("Se procesa evento: ", info[ticker]['event_id'])
                info[ticker]['u_anterior'] = u_actual
                info[ticker]['event_id'] += 1
                process_event(ticker, new_asks, new_bids)
        else:
            info[ticker]['u_anterior'] = u_actual
    
    # Hacemos que el evento anterior sea ahora el que fue nuestro evento actual

In [1013]:
def start_ws(ticker:str):
    ws_url = f"wss://fstream.binance.com/stream?streams={ticker.lower()}@depth"
    # Aquí necesitamos enviar la dirección de memoria de on_message pera necesitamos enviar el
    # ticker como argumento. Esto lo logramos con una función anómina (lambda) 
    ws = WebSocketApp(url=ws_url, on_message=lambda ws, msg: on_message(ws, msg, ticker), on_close=on_close)
    ws.run_forever() 

In [1014]:
def main():
    global info
    
    ticker = "btcusdt"
    # 1. Iniciamos la variable que guarda la información
    init_info(ticker)
        
    # 2. Iniciamos Websocket
    start_ws(ticker)


In [1015]:
main()

Se obtuvo Snapshot ya que pu != u_anterior
++++++++++++++++++++++++++++++++++++++++++++++++++
ACTUALIZANDO ORDER BOOK
++++++++++++++++++++++++++++++++++++++++++++++++++
         quantity
price            
16817.0    26.114
16817.1     0.118
16817.2     0.571
16817.3     0.641
16817.4     0.457
...           ...
16919.4     2.268
16919.7     0.004
16919.8     4.093
16919.9     0.021
17247.7     2.479

[1001 rows x 1 columns]
         quantity
price            
1000.0    110.155
1681.7      0.011
16312.5     0.181
16477.0     9.954
16531.0   125.882
...           ...
16816.5     0.695
16816.6     0.069
16816.7     0.005
16816.8     3.204
16816.9    26.293

[1006 rows x 1 columns]
++++++++++++++++++++++++++++++++++++++++++++++++++
ACTUALIZANDO ORDER BOOK
++++++++++++++++++++++++++++++++++++++++++++++++++
         quantity
price            
16817.0    26.108
16817.1     0.118
16817.2     0.571
16817.3     0.641
16817.4     0.457
...           ...
16919.8     4.093
16919.9     0.021
17168.4