In [1]:
#%pip install -qqq python-binance pandas websocket-client sqlalchemy psycopg2-binary fastapi uvicorn matplotlib


In [2]:
import json
import websocket
import os 
from binance.client import Client
import pandas
import logging
import sys
from sqlalchemy import create_engine, text

In [3]:
logger = logging.getLogger()
logger.setLevel(logging.INFO)  
if not logger.hasHandlers():
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(logging.Formatter('%(message)s'))
    logger.addHandler(handler)

logging.info("Libraries imported successfully")

Libraries imported successfully


In [4]:
api_key = os.environ["BINANCE_API_KEY"]
api_secret = os.environ["BINANCE_SECRET"]
client = Client(api_key, api_secret, testnet=True)
symbol = "BTCUSDT"
socket = f"wss://stream.binance.com:9443/ws/{symbol.lower()}@trade"

Getting near-real-time price updates via WebSocket so that we can compare them with the last cached/retrieved price information from the REST API.


In [5]:
tickers = client.get_symbol_ticker(symbol='BTCUSDT')
df_current_prices = pandas.DataFrame([tickers], columns=["symbol","price"])
df_current_prices["price"] = df_current_prices["price"].astype(float)
def on_message_single(ws, message):
    msg = json.loads(message)
    df_current_prices["websocket_price"] = float(msg['p'])
    ws.close()
    logging.info("Received message and closed connection!")
web = websocket.WebSocketApp(
    socket,
    on_message=on_message_single)

web.run_forever()
df_current_prices = df_current_prices.rename(columns={"price":"rest_api_price"})
df_current_prices.head()

Websocket connected
Received message and closed connection!


Unnamed: 0,symbol,rest_api_price,websocket_price
0,BTCUSDT,90611.57,90611.57


In [None]:
SYMBOL = "btcusdt"
counter = 0
ticks = []

def on_message_stream(ws, message):
    global counter
    global ticks
    data = json.loads(message)
    tick = {
        "ts": pandas.to_datetime(data["T"], unit="ms", utc=True), 
        "price": float(data["p"])
    }
    counter += 1
    ticks.append(tick)
    if counter == 20000:
        ws.close()
        logging.info(f"Received {counter} messages, closing WebSocket")

web = websocket.WebSocketApp(
    socket,
    on_message=on_message_stream)

web.run_forever()


Websocket connected


Aggregation of data received from the WebSocket

In [None]:
df_ticks = pandas.DataFrame(ticks).sort_values("ts").set_index("ts")

df_web_socket = df_ticks.resample("1min").agg(
    highest=("price", "max"),
    lowest=("price", "min"),
    mean=("price", "mean"),
    volume=("price", "count"),
)

df_web_socket["ma_10"] = df_web_socket["mean"].rolling(10, min_periods=1).mean()
df_web_socket["ma_20"] = df_web_socket["mean"].rolling(20, min_periods=1).mean()
df_web_socket["instrument"] = SYMBOL.upper()
df_web_socket["data_source"] = "WEBSOCKET"
df_web_socket.reset_index(inplace=True)
df_web_socket = df_web_socket.rename(columns={"mean":"price"})
df_web_socket = df_web_socket[["ts","instrument","price","volume","highest","lowest","ma_10","ma_20","data_source"]]
df_web_socket.tail()


In [None]:
interval = Client.KLINE_INTERVAL_1MINUTE
limit = 8  #amount of minutes to get from the REST API

klines = client.get_klines(symbol=symbol, interval=interval, limit=limit)
columns = [
    "open_time", "open", "high", "low", "close", "volume",
    "close_time", "quote_asset_volume", "trades",
    "taker_buy_base", "taker_buy_quote", "ignore"
]

df_rest = pandas.DataFrame(klines, columns=columns)

df_rest["open_time"] = pandas.to_datetime(df_rest["open_time"], unit="ms", utc=True)
numeric_cols = ["open", "high", "low", "close", "volume"]
df_rest[numeric_cols] = df_rest[numeric_cols].astype(float)

df_rest = df_rest.set_index("open_time")
df_rest["ma_10"] = df_rest["close"].rolling(10, min_periods=1).mean()
df_rest["ma_20"] = df_rest["close"].rolling(20, min_periods=1).mean()
df_rest["instrument"] = SYMBOL.upper()
df_rest["data_source"] = "REST_API"
df_rest = df_rest[["instrument","close","volume","high","low","ma_10","ma_20","data_source"]]
df_rest.reset_index(inplace=True)
df_rest = df_rest.rename(columns={"open_time":"ts", "close":"price", "high":"highest", "low":"lowest"})
df_rest.tail()

Creation of comparison dataframe of the data received from REST API and WebSocket

In [None]:
compare_df = df_web_socket.join(
    df_rest[["price","highest", "lowest", "ma_10", "ma_20"]],
    how="inner",
    lsuffix="_ws",
    rsuffix="_rest"
)
try:
    compare_df.plot(y=["price_ws","price_rest"])
except NameError:
    logging.info("Plotting skipped due to NameError")

In [None]:
user = os.environ["POSTGRE_USER"]      
password = os.environ["POSTGRE_PASS"]
host = "c3a9wim9rq.gyxknhzigc.tsdb.cloud.timescale.com"
port = 33547
database = "tsdb"
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}")

In [None]:
def write_to_db(df, table_name):
    df.to_sql(
        table_name,
        engine,
        if_exists="append",
        index=False
    )
    logging.info(f"Data written to table {table_name} successfully")
write_to_db(df_web_socket, "trades")
write_to_db(df_rest, "trades")


In [None]:
query = """
SELECT *
FROM trades
WHERE instrument = :instrument
  AND ts BETWEEN :start AND :end
ORDER BY ts
"""
instrument = input("Enter instrument (e.g., BTCUSDT): ").strip().upper()
start = input("Enter start datetime (YYYY-MM-DD HH:MM): ").strip()
end = input("Enter end datetime (YYYY-MM-DD HH:MM): ").strip()
df = pandas.read_sql(
    text(query),
    engine,
    params={
        "instrument": instrument,
        "start": start,
        "end": end
    }
)

df.head()