In [None]:
import asyncio
import json
import ssl
import websockets
import aiohttp
import logging
from decimal import Decimal
from typing import Dict
logging.basicConfig(filename='orderbook.log', level=logging.INFO,
                    format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
class BinanceOrderbookListener:
    def __init__(self, instrument: str):
        self.instrument = instrument
        self.instrument_name = instrument.lower()
        self.snapshot_required = True
        self.snapshot_update_id = 0
        self.pu = 0
        self.ssl_context = ssl.create_default_context()
        self.ssl_context.check_hostname = False
        self.ssl_context.verify_mode = ssl.CERT_NONE
    async def get_snapshot(self):
        url = f"https://fapi.binance.com/fapi/v1/depth?symbol={self.instrument_name.upper()}&limit=10"
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.json()
    def parse_restapi_response(self, response: Dict) -> Dict:
        return {
            "instrument": self.instrument,
            "exchange": "binance",
            "asks": [(Decimal(price), Decimal(quantity)) for price, quantity in response["asks"]],
            "bids": [(Decimal(price), Decimal(quantity)) for price, quantity in response["bids"]]
        }
    async def get_order_book_data(self, data: Dict) -> Dict:
        curr_pu = int(data["pu"])
        upper_u = int(data["U"])
        lower_u = int(data["u"])
        if self.pu == 0:
            if self.snapshot_required:
                self.snapshot_required = False
                try:
                    snapshot = await self.get_snapshot()
                    self.snapshot_update_id = snapshot["lastUpdateId"]
                    return self.parse_restapi_response(snapshot)
                except Exception as e:
                    logging.error(f"Error getting snapshot: {e}")
                    return {}
            if upper_u > self.snapshot_update_id or lower_u < self.snapshot_update_id:
                logging.info(f"Skipping pre-snapshot message: last snapshot_id: {self.snapshot_update_id}, upper_u: {upper_u}, lower_u: {lower_u}")
                return {}
        if curr_pu != self.pu and self.pu != 0:
            self.snapshot_required = True
            self.pu = 0
            self.snapshot_update_id = 0
            logging.info("Binance OB snapshot required")
            return {}
        self.pu = int(data["u"])
        return {
            "instrument": self.instrument,
            "exchange": "binance",
            "asks": [(Decimal(price), Decimal(quantity)) for price, quantity in data["a"]],
            "bids": [(Decimal(price), Decimal(quantity)) for price, quantity in data["b"]]
        }
    async def handle_message(self, message: str):
        try:
            data = json.loads(message)
            channel = data["stream"]
            event_data = data["data"]
            if channel == f"{self.instrument_name}@depth":
                orderbook = await self.get_order_book_data(event_data)
                if orderbook:
                    logging.info(f"Orderbook: {json.dumps(orderbook, default=str)}")
        except Exception as e:
            logging.error(f"Error handling message: {e}")
            logging.error(f"Message: {message}")
    async def subscribe(self):
        url = f"wss://fstream.binance.com/stream?streams={self.instrument_name}@depth"
        while True:
            try:
                async with websockets.connect(url, ssl=self.ssl_context) as websocket:
                    logging.info(f"Connected to {url}")
                    while True:
                        message = await websocket.recv()
                        await self.handle_message(message)
            except Exception as e:
                logging.error(f"WebSocket error: {e}")
                await asyncio.sleep(5)  # Wait before attempting to reconnect
# In a Jupyter Notebook, you can directly use 'await' instead of 'asyncio.run()'
listener = BinanceOrderbookListener("BTCUSDT")
await listener.subscribe()

## Modified

In [16]:
import asyncio
import json
import ssl
import websockets
import aiohttp
import logging
from decimal import Decimal
from typing import Dict
from datetime import datetime, timedelta
import pandas as pd  # Import pandas for Parquet

# Set up logging to include milliseconds
logging.basicConfig(
    filename='orderbook.log',
    level=logging.INFO,
    format='%(asctime)s.%(msecs)03d - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

class BinanceOrderbookListener:
    def __init__(self, instrument: str):
        self.instrument = instrument
        self.instrument_name = instrument.lower()
        self.snapshot_required = True
        self.snapshot_update_id = 0
        self.pu = 0
        self.ssl_context = ssl.create_default_context()
        self.ssl_context.check_hostname = False
        self.ssl_context.verify_mode = ssl.CERT_NONE
        self.logged_data = []  # To store all received order book data
        self.first_timestamp = None  # To store the first timestamp when data is logged

    async def get_snapshot(self):
        url = f"https://fapi.binance.com/fapi/v1/depth?symbol={self.instrument_name.upper()}&limit=10"
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.json()

    def parse_restapi_response(self, response: Dict) -> Dict:
        return {
            "asks": [(Decimal(price), Decimal(quantity)) for price, quantity in response["asks"]],
            "bids": [(Decimal(price), Decimal(quantity)) for price, quantity in response["bids"]]
        }

    async def get_order_book_data(self, data: Dict) -> Dict:
        curr_pu = int(data["pu"])
        upper_u = int(data["U"])
        lower_u = int(data["u"])
        if self.pu == 0:
            if self.snapshot_required:
                self.snapshot_required = False
                try:
                    snapshot = await self.get_snapshot()
                    self.snapshot_update_id = snapshot["lastUpdateId"]
                    return self.parse_restapi_response(snapshot)
                except Exception as e:
                    logging.error(f"Error getting snapshot: {e}")
                    return {}
            if upper_u > self.snapshot_update_id or lower_u < self.snapshot_update_id:
                logging.info(f"Skipping pre-snapshot message: last snapshot_id: {self.snapshot_update_id}, upper_u: {upper_u}, lower_u: {lower_u}")
                return {}
        if curr_pu != self.pu and self.pu != 0:
            self.snapshot_required = True
            self.pu = 0
            self.snapshot_update_id = 0
            logging.info("Binance OB snapshot required")
            return {}
        self.pu = int(data["u"])
        return {
            "asks": [(Decimal(price), Decimal(quantity)) for price, quantity in data["a"]],
            "bids": [(Decimal(price), Decimal(quantity)) for price, quantity in data["b"]],
            "ts": datetime.utcnow() + timedelta(hours=8)  # Adjust for UTC+8
        }

    async def handle_message(self, message: str):
        try:
            data = json.loads(message)
            channel = data["stream"]
            event_data = data["data"]
            if channel == f"{self.instrument_name}@depth":
                orderbook = await self.get_order_book_data(event_data)
                if orderbook:
                    # Format timestamp to the required format
                    orderbook['ts'] = orderbook['ts'].strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
                    logging.info(f"Orderbook: {json.dumps(orderbook, default=str)}")
                    # Append orderbook data to the logged_data list
                    self.logged_data.append(orderbook)
                    # Capture the timestamp of the first message logged
                    if self.first_timestamp is None:
                        self.first_timestamp = orderbook['ts']  # Store the first timestamp as a datetime object
        except Exception as e:
            logging.error(f"Error handling message: {e}")
            logging.error(f"Message: {message}")

    async def subscribe(self, duration: int, output_format='parquet'):  # Added output_format parameter
        url = f"wss://fstream.binance.com/stream?streams={self.instrument_name}@depth"
        try:
            async with websockets.connect(url, ssl=self.ssl_context) as websocket:
                logging.info(f"Connected to {url}")
                
                # Stop the loop after the specified duration
                end_time = asyncio.get_event_loop().time() + duration
                
                while True:
                    # Check if the time is up
                    if asyncio.get_event_loop().time() > end_time:
                        logging.info(f"Time limit reached. Stopping subscription.")
                        break

                    try:
                        message = await websocket.recv()
                        await self.handle_message(message)
                    except asyncio.TimeoutError:
                        logging.error(f"WebSocket Timeout Error.")
        except Exception as e:
            logging.error(f"WebSocket error: {e}")
            await asyncio.sleep(5)  # Wait before attempting to reconnect

        # After the subscription duration, write the logged data to a specified file format
        self.write_to_file(duration, output_format=output_format)  # Pass the output_format parameter

    def write_to_file(self, duration, output_format='parquet'):
        # If no data was logged, skip writing the file
        if self.first_timestamp is None:
            logging.info("No data logged. Skipping file creation.")
            return

        # Convert the first timestamp to UTC +8
        local_timestamp = datetime.strptime(self.first_timestamp, '%Y-%m-%d %H:%M:%S.%f') + timedelta(hours=8)

        # Format the timestamp to the desired format: YYYYMMDD_HHMMSS
        formatted_timestamp = local_timestamp.strftime('%Y%m%d_%H%M%S')

        # Determine the filename based on the output format
        if output_format == 'json':
            filename = f"Binance_BTCUSDT_{formatted_timestamp}_{duration}.json"
            # Write the logged data to the JSON file
            with open(filename, 'w') as json_file:
                json.dump(self.logged_data, json_file, indent=4, default=str)
        elif output_format == 'parquet':
            filename = f"Binance_BTCUSDT_{formatted_timestamp}_{duration}.parquet"
            # Convert logged data to a DataFrame and rearrange columns to ts, bids, asks
            df = pd.DataFrame(self.logged_data)
            df = df[['ts', 'bids', 'asks']]  # Reorder columns
            df.to_parquet(filename, index=False)

        logging.info(f"Logged data written to {filename}")
        print(f"Data saved to {filename}")

# To run this in an interactive Python environment (like Jupyter Notebook)
# listener = BinanceOrderbookListener("btcusdt")
# await listener.subscribe(duration=10, output_format='parquet')

In [18]:
# To run this in an interactive Python environment (like Jupyter Notebook)
listener = BinanceOrderbookListener("btcusdt")
await listener.subscribe(duration=15, output_format='parquet')

Data saved to Binance_BTCUSDT_20241022_014858_15.parquet


## Optimization

In [14]:
import asyncio
import json
import logging
from decimal import Decimal
from datetime import datetime, timedelta
from websockets import connect
from typing import Dict, List

class OrderBookWebSocket:
    def __init__(self, instrument_name: str):
        self.instrument_name = instrument_name
        self.snapshot_required = True
        self.snapshot_update_id = None
        self.pu = 0  # Last processed update id
        self.first_timestamp = None
        self.logged_data: List[Dict] = []
        self.base_url = 'wss://fstream.binance.com/stream?streams='

    async def get_snapshot(self) -> Dict:
        # Fetch the initial order book snapshot from the REST API
        # This should be implemented according to your API call logic
        pass

    async def get_order_book_data(self, data: Dict) -> Dict:
        curr_pu = int(data["pu"])
        upper_u = int(data["U"])
        lower_u = int(data["u"])

        # If snapshot_required is true, get the snapshot right away
        if self.snapshot_required:
            try:
                snapshot = await self.get_snapshot()
                self.snapshot_update_id = snapshot["lastUpdateId"]
                self.snapshot_required = False  # Mark snapshot as fetched
                return self.parse_restapi_response(snapshot)
            except Exception as e:
                logging.error(f"Error getting snapshot: {e}")
                return {}

        # If the update is out of bounds, we can skip it
        if upper_u > self.snapshot_update_id or lower_u < self.snapshot_update_id:
            logging.info(f"Skipping pre-snapshot message: last snapshot_id: {self.snapshot_update_id}, upper_u: {upper_u}, lower_u: {lower_u}")
            return {}

        # If the current pu is not in the expected sequence, mark snapshot as required
        if curr_pu != self.pu and self.pu != 0:
            self.snapshot_required = True
            logging.info("Binance OB snapshot required")
            return {}

        self.pu = int(data["u"])  # Update the last processed sequence number
        return {
            "asks": [(Decimal(price), Decimal(quantity)) for price, quantity in data["a"]],
            "bids": [(Decimal(price), Decimal(quantity)) for price, quantity in data["b"]],
            "ts": datetime.utcnow() + timedelta(hours=8)  # Adjust for UTC+8
        }

    async def handle_message(self, message: str):
        try:
            data = json.loads(message)
            channel = data["stream"]
            event_data = data["data"]
            if channel == f"{self.instrument_name}@depth":
                orderbook = await self.get_order_book_data(event_data)
                if orderbook:
                    orderbook['ts'] = orderbook['ts'].strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
                    logging.info(f"Orderbook: {json.dumps(orderbook, default=str)}")
                    self.logged_data.append(orderbook)
                    if self.first_timestamp is None:
                        self.first_timestamp = orderbook['ts']
        except Exception as e:
            logging.error(f"Error handling message: {e}")
            logging.error(f"Message: {message}")

    async def listen(self):
        async with connect(self.base_url + f"{self.instrument_name}@depth") as websocket:
            while True:
                message = await websocket.recv()
                await self.handle_message(message)


In [20]:
# To run this in an interactive Python environment (like Jupyter Notebook)
listener = BinanceOrderbookListener("btcusdt")
await listener.subscribe(duration=5, output_format='parquet')  # Change 'parquet' to 'json' for JSON output

Data saved to Binance_BTCUSDT_20241022_015829_5.parquet


## Modified (limit 100)

In [25]:
import asyncio
import json
import ssl
import websockets
import aiohttp
import logging
from decimal import Decimal
from typing import Dict
from datetime import datetime, timedelta
import pandas as pd  # Import pandas for Parquet

# Set up logging to include milliseconds
logging.basicConfig(
    filename='orderbook.log',
    level=logging.INFO,
    format='%(asctime)s.%(msecs)03d - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

class BinanceOrderbookListener:
    def __init__(self, instrument: str):
        self.instrument = instrument
        self.instrument_name = instrument.lower()
        self.snapshot_required = True
        self.snapshot_update_id = 0
        self.pu = 0
        self.ssl_context = ssl.create_default_context()
        self.ssl_context.check_hostname = False
        self.ssl_context.verify_mode = ssl.CERT_NONE
        self.logged_data = []  # To store all received order book data
        self.first_timestamp = None  # To store the first timestamp when data is logged
        self.bid_count = 0  # Count of bids logged for the current timestamp
        self.ask_count = 0  # Count of asks logged for the current timestamp

    async def get_snapshot(self):
        url = f"https://fapi.binance.com/fapi/v1/depth?symbol={self.instrument_name.upper()}&limit=10"
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.json()

    def parse_restapi_response(self, response: Dict) -> Dict:
        return {
            "asks": [(Decimal(price), Decimal(quantity)) for price, quantity in response["asks"]],
            "bids": [(Decimal(price), Decimal(quantity)) for price, quantity in response["bids"]]
        }

    async def get_order_book_data(self, data: Dict) -> Dict:
        curr_pu = int(data["pu"])
        upper_u = int(data["U"])
        lower_u = int(data["u"])
        if self.pu == 0:
            if self.snapshot_required:
                self.snapshot_required = False
                try:
                    snapshot = await self.get_snapshot()
                    self.snapshot_update_id = snapshot["lastUpdateId"]
                    return self.parse_restapi_response(snapshot)
                except Exception as e:
                    logging.error(f"Error getting snapshot: {e}")
                    return {}
            if upper_u > self.snapshot_update_id or lower_u < self.snapshot_update_id:
                logging.info(f"Skipping pre-snapshot message: last snapshot_id: {self.snapshot_update_id}, upper_u: {upper_u}, lower_u: {lower_u}")
                return {}
        if curr_pu != self.pu and self.pu != 0:
            self.snapshot_required = True
            self.pu = 0
            self.snapshot_update_id = 0
            logging.info("Binance OB snapshot required")
            return {}
        self.pu = int(data["u"])
        return {
            "asks": [(Decimal(price), Decimal(quantity)) for price, quantity in data["a"]],
            "bids": [(Decimal(price), Decimal(quantity)) for price, quantity in data["b"]],
            "ts": datetime.utcnow() + timedelta(hours=8)  # Adjust for UTC+8
        }

    async def handle_message(self, message: str):
        try:
            data = json.loads(message)
            channel = data["stream"]
            event_data = data["data"]
            if channel == f"{self.instrument_name}@depth":
                orderbook = await self.get_order_book_data(event_data)
                if orderbook:
                    # Format timestamp to the required format
                    orderbook['ts'] = orderbook['ts'].strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
                    
                    # Initialize counts if it's a new timestamp
                    if self.first_timestamp is None or orderbook['ts'] != self.first_timestamp:
                        self.first_timestamp = orderbook['ts']
                        self.bid_count = 0
                        self.ask_count = 0

                    # Limit the number of bids and asks to 100
                    if self.bid_count < 100 and self.ask_count < 100:
                        # Append to logged_data and increment counts
                        self.logged_data.append(orderbook)
                        self.bid_count += len(orderbook['bids'])
                        self.ask_count += len(orderbook['asks'])

                    logging.info(f"Orderbook: {json.dumps(orderbook, default=str)}")
        except Exception as e:
            logging.error(f"Error handling message: {e}")
            logging.error(f"Message: {message}")

    async def subscribe(self, duration: int, output_format='parquet'):
        url = f"wss://fstream.binance.com/stream?streams={self.instrument_name}@depth"
        try:
            async with websockets.connect(url, ssl=self.ssl_context) as websocket:
                logging.info(f"Connected to {url}")
                
                # Stop the loop after the specified duration
                end_time = asyncio.get_event_loop().time() + duration
                
                while True:
                    # Check if the time is up
                    if asyncio.get_event_loop().time() > end_time:
                        logging.info(f"Time limit reached. Stopping subscription.")
                        break

                    try:
                        message = await websocket.recv()
                        await self.handle_message(message)
                    except asyncio.TimeoutError:
                        logging.error(f"WebSocket Timeout Error.")
        except Exception as e:
            logging.error(f"WebSocket error: {e}")
            await asyncio.sleep(5)  # Wait before attempting to reconnect

        # After the subscription duration, write the logged data to a specified file format
        self.write_to_file(duration, output_format=output_format)

    def write_to_file(self, duration, output_format='parquet'):
        # If no data was logged, skip writing the file
        if self.first_timestamp is None:
            logging.info("No data logged. Skipping file creation.")
            return

        # Convert the first timestamp to UTC +8
        local_timestamp = datetime.strptime(self.first_timestamp, '%Y-%m-%d %H:%M:%S.%f') + timedelta(hours=8)

        # Format the timestamp to the desired format: YYYYMMDD_HHMMSS
        formatted_timestamp = local_timestamp.strftime('%Y%m%d_%H%M%S')

        # Determine the filename based on the output format
        if output_format == 'json':
            filename = f"Binance_BTCUSDT_{formatted_timestamp}_{duration}.json"
            # Write the logged data to the JSON file
            with open(filename, 'w') as json_file:
                json.dump(self.logged_data, json_file, indent=4, default=str)
        elif output_format == 'parquet':
            filename = f"Binance_BTCUSDT_{formatted_timestamp}_{duration}.parquet"
            # Convert logged data to a DataFrame and rearrange columns to ts, bids, asks
            df = pd.DataFrame(self.logged_data)
            df = df[['ts', 'bids', 'asks']]  # Reorder columns
            df.to_parquet(filename, index=False)

        logging.info(f"Logged data written to {filename}")
        print(f"Data saved to {filename}")

# To run this in an interactive Python environment (like Jupyter Notebook)
# listener = BinanceOrderbookListener("btcusdt")
# await listener.subscribe(duration=10, output_format='parquet')


In [26]:
# In a Jupyter Notebook, you can directly use 'await' instead of 'asyncio.run()'
listener = BinanceOrderbookListener("btcusdt")
await listener.subscribe(duration=10, output_format='parquet')

Data saved to Binance_BTCUSDT_20241022_021225_10.parquet
