In [11]:
pip install websocket

Collecting websocket
  Downloading websocket-0.2.1.tar.gz (195 kB)
     -------------------------------------- 195.3/195.3 kB 6.0 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: websocket
  Building wheel for websocket (setup.py): started
  Building wheel for websocket (setup.py): finished with status 'done'
  Created wheel for websocket: filename=websocket-0.2.1-py3-none-any.whl size=192132 sha256=2bdd76a24183531ebb240487dbaf51d88fd11a4106a9fc3e7a2e529c72518101
  Stored in directory: c:\users\administrator\appdata\local\pip\cache\wheels\cd\8c\c5\42beaa658f4825f4dc80634c34c5a4bb564cdd75545346fa93
Successfully built websocket
Installing collected packages: websocket
Successfully installed websocket-0.2.1

[notice] A new release of pip available: 22.1.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip
Note: you may need to restart the kernel to use upda

In [16]:
pip install websocket-client


[notice] A new release of pip available: 22.1.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.


In [None]:
pip install nest_asyncio

In [9]:
# jupyter 실행
import nest_asyncio

In [None]:
#korbit 과제 google colab에서도 코드 실행 확인 완료 2025-04-04

import json
import time
import threading
import logging
import requests
import websocket
from datetime import datetime

# Configure logging
logging.basicConfig(filename="collector.log", level=logging.INFO, 
                    format="%(asctime)s [%(levelname)s] %(message)s")

class BaseCollector(threading.Thread):
    def __init__(self, exchange_name):
        super().__init__()
        self.exchange = exchange_name
        self.daemon = True  # so threads exit on main exit
        self.current_hour = None
        self.file = None

    def _open_new_file(self):
        """Open a new JSON file for a new hour."""
        if self.file:
            # Close the previous file properly by ending the JSON array
            self.file.write("\n]")
            self.file.close()
        hour_ts = datetime.utcnow().strftime("%Y%m%d_%H")  # e.g., 20250404_14
        filename = f"{self.exchange.lower()}_orderbook_{hour_ts}.json"
        self.file = open(filename, "w", encoding="utf-8")
        self.file.write("[\n")  # start JSON array
        self.current_hour = hour_ts
        logging.info(f"{self.exchange}: Started new data file {filename}")

    def write_entry(self, entry):
        """Write a single orderbook entry (timestamp, bids, asks) to JSON file."""
        # Check hour change
        hour_ts = datetime.utcnow().strftime("%Y%m%d_%H")
        if self.current_hour != hour_ts or self.file is None:
            # rotate to new file at hour boundary
            self._open_new_file()
        # Write the JSON entry (as a single object in the array)
        json_str = json.dumps(entry)
        # If not the first entry, prepend a comma+newline for proper JSON list format
        # (The first entry will be just after the '[' so no comma needed)
        if self.file.tell() > 2:  # if file pointer is not at start (assuming "[" and newline took 2 bytes or more)
            self.file.write(",\n")
        self.file.write(json_str)
        # Flush to ensure data is written to disk
        self.file.flush()

    def log(self, message):
        logging.info(f"{self.exchange}: {message}")

    def run(self):
        """Each subclass should implement its own run method."""
        raise NotImplementedError

# Collector for Upbit using WebSocket
class UpbitCollector(BaseCollector):
    def __init__(self):
        super().__init__("Upbit")

    def run(self):
        ws_url = "wss://api.upbit.com/websocket/v1"  # Upbit public WS endpoint
        # Subscription message to orderbook (Upbit expects an array of JSON messages)
        subscribe_payload = [
            {"ticket": "ORDERBOOK_COLLECTOR"}, 
            {"type": "orderbook", "codes": ["KRW-BTC"]}
        ]
        def on_open(ws):
            self.log("WebSocket connection opened")
            ws.send(json.dumps(subscribe_payload))
            self.log("Subscribed to Upbit KRW-BTC orderbook")

        def on_message(ws, message):
            try:
                data = json.loads(message)
            except json.JSONDecodeError:
                return  # skip if malformed
            if data.get("type") == "orderbook":  # orderbook update message
                order_units = data.get("orderbook_units")
                if order_units:
                    # Extract top 10 bids and asks
                    top_bids = [ [unit["bid_price"], unit["bid_size"]] for unit in order_units[:10] ]
                    top_asks = [ [unit["ask_price"], unit["ask_size"]] for unit in order_units[:10] ]
                    entry = {
                        "timestamp": data.get("timestamp"),  # Upbit provides timestamp in ms
                        "bids": top_bids,
                        "asks": top_asks
                    }
                    self.write_entry(entry)

        def on_error(ws, error):
            self.log(f"WebSocket error: {error}")

        def on_close(ws, status_code, msg):
            self.log(f"WebSocket closed (code={status_code}). Reconnecting...")
            # Attempt reconnection after a short delay
            time.sleep(1)
            reconnect()

        def reconnect():
            # Recreate a WebSocketApp and start it again
            ws_app = websocket.WebSocketApp(ws_url, on_open=on_open, 
                                            on_message=on_message, 
                                            on_error=on_error, on_close=on_close)
            try:
                ws_app.run_forever()  # blocking call that will keep running
            except Exception as e:
                self.log(f"Exception in run_forever: {e}")
                time.sleep(1)
                reconnect()

        # Start the WebSocket connection loop
        self.log("Starting Upbit WebSocket collector")
        reconnect()

# Collector for Bithumb using REST
class BithumbCollector(BaseCollector):
    def __init__(self):
        super().__init__("Bithumb")

    def run(self):
        self.log("Starting Bithumb REST collector")
        url = "https://api.bithumb.com/public/orderbook/BTC_KRW?count=10"
        while True:
            start_time = time.time()
            try:
                resp = requests.get(url, timeout=1)
                data = resp.json()
            except Exception as e:
                self.log(f"REST request error: {e}")
                time.sleep(0.2)
                continue
            if data.get("status") == "0000":
                orderbook = data.get("data", {})
                # The Bithumb REST returns bids/asks lists with "price" and "quantity"
                bids = orderbook.get("bids", [])
                asks = orderbook.get("asks", [])
                top_bids = [ [float(item["price"]), float(item["quantity"])] for item in bids[:10] ]
                top_asks = [ [float(item["price"]), float(item["quantity"])] for item in asks[:10] ]
                entry = {
                    "timestamp": orderbook.get("timestamp"),  # in ms (string format)
                    "bids": top_bids,
                    "asks": top_asks
                }
                self.write_entry(entry)
            else:
                # API returned an error status
                err_code = data.get("status")
                self.log(f"API returned error status {err_code}")
            # Maintain ~0.5s interval (taking into account request time)
            elapsed = time.time() - start_time
            if elapsed < 0.5:
                time.sleep(0.5 - elapsed)

# Collector for Korbit using WebSocket
class KorbitCollector(BaseCollector):
    def __init__(self):
        super().__init__("Korbit")

    def run(self):
        ws_url = "wss://ws-api.korbit.co.kr/v2/ws"
        # Korbit expects a subscription message as a JSON array (old API format)
        # As per Korbit docs: [{"method":"subscribe","type":"orderbook","symbols":["btc_krw"]}]
        subscribe_payload = [ { "method": "subscribe", "type": "orderbook", "symbols": ["btc_krw"] } ]
        def on_open(ws):
            self.log("WebSocket connection opened")
            ws.send(json.dumps(subscribe_payload))
            self.log("Subscribed to Korbit btc_krw orderbook")

        def on_message(ws, message):
            try:
                data = json.loads(message)
            except json.JSONDecodeError:
                return
            # Korbit sends messages with a "type" field indicating the data type
            if data.get("type") == "orderbook":
                # The snapshot or update data is inside data["data"]
                ob = data.get("data", {})
                bids = ob.get("bids", [])
                asks = ob.get("asks", [])
                top_bids = [ [float(item["price"]), float(item["qty"])] for item in bids[:10] ]
                top_asks = [ [float(item["price"]), float(item["qty"])] for item in asks[:10] ]
                entry = {
                    "timestamp": ob.get("timestamp") or data.get("timestamp"), 
                    "bids": top_bids,
                    "asks": top_asks
                }
                self.write_entry(entry)

        def on_error(ws, error):
            self.log(f"WebSocket error: {error}")

        def on_close(ws, status_code, msg):
            self.log(f"WebSocket closed (code={status_code}). Reconnecting...")
            time.sleep(1)
            reconnect()

        def reconnect():
            ws_app = websocket.WebSocketApp(ws_url, on_open=on_open, 
                                            on_message=on_message,
                                            on_error=on_error, on_close=on_close)
            try:
                ws_app.run_forever()
            except Exception as e:
                self.log(f"WebSocket run_forever exception: {e}")
                time.sleep(1)
                reconnect()

        self.log("Starting Korbit WebSocket collector")
        reconnect()

# Collector for Coinone using WebSocket
class CoinoneCollector(BaseCollector):
    def __init__(self):
        super().__init__("Coinone")

    def run(self):
        ws_url = "wss://stream.coinone.co.kr"
        # Coinone expects a JSON with request_type, channel, topic for subscription
        subscribe_payload = {
            "request_type": "SUBSCRIBE",
            "channel": "ORDERBOOK",
            "topic": {"quote_currency": "KRW", "target_currency": "BTC"}
        }
        def on_open(ws):
            self.log("WebSocket connection opened")
            ws.send(json.dumps(subscribe_payload))
            self.log("Subscribed to Coinone ORDERBOOK KRW-BTC")

        def on_message(ws, message):
            try:
                data = json.loads(message)
            except json.JSONDecodeError:
                return
            # Coinone's response might contain 'channel': 'ORDERBOOK' and the data in e.g. 'data' or 'body'
            if data.get("channel") == "ORDERBOOK":
                content = data.get("data") or data.get("body") or data  # find where the orderbook content is
                # Coinone orderbook snapshot content includes bids/asks lists
                bids = content.get("bids", [])
                asks = content.get("asks", [])
                top_bids = [ [float(item["price"]), float(item["qty"])] for item in bids[:10] ]
                top_asks = [ [float(item["price"]), float(item["qty"])] for item in asks[:10] ]
                entry = {
                    "timestamp": content.get("timestamp") or data.get("timestamp"), 
                    "bids": top_bids,
                    "asks": top_asks
                }
                self.write_entry(entry)

        def on_error(ws, error):
            self.log(f"WebSocket error: {error}")

        def on_close(ws, status_code, msg):
            self.log(f"WebSocket closed (code={status_code}). Reconnecting...")
            time.sleep(1)
            reconnect()

        def reconnect():
            ws_app = websocket.WebSocketApp(ws_url, on_open=on_open, 
                                            on_message=on_message,
                                            on_error=on_error, on_close=on_close)
            try:
                ws_app.run_forever()
            except Exception as e:
                self.log(f"WebSocket run_forever exception: {e}")
                time.sleep(1)
                reconnect()

        self.log("Starting Coinone WebSocket collector")
        reconnect()


In [None]:
# Main section to start all collectors
if __name__ == "__main__":
    collectors = [
        KorbitCollector(),
        UpbitCollector(),
        BithumbCollector(),
        CoinoneCollector()
    ]
    for collector in collectors:
        collector.start()
    # Let collectors run for a desired duration (e.g., 10 minutes)
    time.sleep(600)  # 600 seconds = 10 minutes
    # Optionally, signal threads to stop (in this simple design, we might just exit, relying on daemon threads)
    logging.info("Main: 10 minutes elapsed, stopping collectors.")
    # Closing files and resources
    for collector in collectors:
        try:
            if collector.file:
                collector.file.write("\n]")
                collector.file.close()
        except Exception as e:
            logging.error(f"Error closing file for {collector.exchange}: {e}")