<a href="https://colab.research.google.com/github/reosiain/app/blob/master/notebooks/ws_api_prototype.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
!pip install polars
!pip install websocket-client
!pip install rel



In [17]:
import requests
import datetime
import os
import time
import json
import math
from dataclasses import dataclass, asdict
import pandas as pd
import polars as pl
from polars import string_cache


In [7]:
URL_BASE = "https://www.okex.com"
HEADERS = {
    "OK-ACCESS-KEY": os.getenv('API_KEY'),
    "OK-ACCESS-SIGN": os.getenv('API_SIGNATURE')
}

HEADERS = {
    "OK-ACCESS-KEY": "67a8941d-5f77-4322-8c55-c1650e683673",
    "OK-ACCESS-SIGN": "D6E1DF1DE6D87013D409D29FBCA56E9B",
    #'OK-ACCESS-TIMESTAMP': str(int(datetime.datetime.now().timestamp())*1000-1000000),
}

In [48]:
@dataclass
class HistCandles:
    dt: list
    o: list
    h: list
    l: list
    c: list
    vol_count: list
    vol_coin: list

    def __init__(self) -> None:
      self.dt = []
      self.o = []
      self.h = []
      self.l = []
      self.c = []
      self.vol_count = []
      self.vol_coin = []

    def add(self, line) -> None:
      self.dt.append(line[0])
      self.o.append(line[1])
      self.h.append(line[2])
      self.l.append(line[3])
      self.c.append(line[4])
      self.vol_count.append(line[5])
      self.vol_coin.append(line[6])

    def to_polars(self) -> pl.DataFrame():
      return pl.DataFrame(asdict(self))



def to_unix(dt: datetime.datetime) -> int:
    return int(dt.timestamp() * 1000)


def to_isodate(dt: str) -> str:
    dt = int(dt) / 1000
    dt = datetime.datetime.utcfromtimestamp(int(dt))
    return dt.isoformat()

In [96]:
################ General 

def get_instruments(name: str, type:str = "SPOT") -> dict:
    """
    
    """
    global HEADERS
    global URL_BASE

    body = {"instType": type, "uly":name}

    a = requests.get(
        URL_BASE + "/api/v5/public/instruments", headers=HEADERS, params=body
    )
    return a.json()["data"]

def get_tickers(type:str = "SPOT") -> dict:
    """
    Retrieve the latest price snapshot, best bid/ask price, and trading volume in the last 24 hours.
    """
    global HEADERS
    global URL_BASE

    body = {"instType": type}

    a = requests.get(
        URL_BASE + "/api/v5/market/tickers", headers=HEADERS, params=body
    )
    return a.json()["data"]

In [39]:
def get_hist_candles(
    pair: str, older_time: datetime.datetime, recent_time: datetime.datetime = None, verbose:bool = False
) -> HistCandles:
    """
    Retrieve history candlestick charts from recent years (in lists).
    """
    if recent_time is None:
        now = to_unix(datetime.datetime.now())
    else:
        now = to_unix(recent_time)

    step_msec = 60000
    per_page = 100
    global HEADERS
    global URL_BASE
    body = {
        "instId": pair,
        "after": str(now),
    }

    loops = math.ceil(((now - to_unix(older_time)) / step_msec) / per_page)
    data = HistCandles()

    for step in range(loops):
        r = requests.get(
            URL_BASE + "/api/v5/market/history-candles", headers=HEADERS, params=body
        ).json()["data"]

        if verbose:
          print(to_isodate(str(now)), r)

        if len(r) == 0:
          now -= step*step_msec*per_page # Step back in time if empty response
        else:
          now = r[-1][0] # Time of the last candle in this batch

        for cndl in r:
            data.add((cndl[i] if i != 0 else to_isodate(cndl[i]) for i in range(6)))
        
        body["after"] = str(now)

        time.sleep(0.06)

    return data

In [78]:
def get_latest_candles(pair: str,  verbose:bool = False) -> HistCandles:
    """
    Retrieve the candlestick charts. This endpoint can retrieve the latest 1,440 data entries. 
    Charts are returned in groups based on the requested bar. 
    """
    global HEADERS
    global URL_BASE

    body = {"instId": pair}

    data = HistCandles()

    for step in range(10):
        r = requests.get(
            URL_BASE + "/api/v5/market/candles", headers=HEADERS, params=body
        ).json()["data"]
        for cndl in r:
            data.add(tuple(cndl[i] if i != 0 else to_isodate(cndl[i]) for i in range(7)))
        
        body["after"] = str(r[-1][0])

        if verbose:
          print(to_isodate(body["after"]), r)

        time.sleep(0.03)

    return data

In [None]:
from typing_extensions import ParamSpec
import websocket
import ssl
import json

import threading 
from threading import Lock 

input_container = []
output_container = []

ws_address = 'wss://wspap.okx.com:8443/ws/v5/public?brokerId=9999'

params = {
    "op": "subscribe",
    "args": [
      {
        "channel": "candle1m",
        "instId": "DOT-USDT"
      },
      {
        "channel": "candle1m",
        "instId": "AVAX-USDT"
      },
      {
        "channel": "candle1m",
        "instId": "ADA-USDT"
      },
      {
        "channel": "candle1m",
        "instId": "DOGE-USDT"
      },
      {
        "channel": "candle1m",
        "instId": "SOL-USDT"
      },
      {
        "channel": "candle1m",
        "instId": "LUNA-USDT"
      },
      {
        "channel": "candle1m",
        "instId": "XRP-USDT"
      }
      ,
      {
        "channel": "candle1m",
        "instId": "ETH-USDT"
      }
    ]
}

class Ws:
  def __init__(self, path:str, params:dict, lock: Lock) -> None:

    self.subscription_params = params
    websocket.enableTrace(False)
    self.ws = websocket.WebSocketApp(path,
                            on_open = self.on_open,
                            on_close = self.on_close, 
                            on_message = self.on_message,
                            on_error=self.on_error
                          )


  @staticmethod
  def on_open(ws):
    print('Opened Connection')
    ws.send(json.dumps(params))
  
  @staticmethod
  def on_close(ws, close_status_code, close_msg):
      print('Closed Connection')

  @staticmethod
  def on_message(ws, message):
      print(json.loads(message))
      global lock
      global input_container
      with lock:
        input_container.append(json.loads(message))
  
  @staticmethod
  def on_error(ws, err):
    print("Got a an error: ", err)

  def run(self):
    self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})



In [23]:
class Socket_tread(threading.Thread):
    def __init__(self, name, lock, **kwargs):
        threading.Thread.__init__(self)
        self.name = name
        self.ws = Ws(kwargs['path'], kwargs['params'], lock)
        self.lock = lock

    def run(self):
        global intermediate_container
        self.ws.run()


class Mongo_thread(threading.Thread):
    def __init__(self, name, lock):
        threading.Thread.__init__(self)
        self.name = name
        self.lock = lock 

    def run(self):
        global input_container
        global output_container
        
        while True:
          if len(input_container) == 10:
            with self.lock:
              output_container = input_container.copy()
              input_container = []

            with open(r"new_", 'a') as f:
              f.write('Good\n')


In [24]:
lock = Lock()
s = Socket_tread('ws', lock, path=ws_address, params = params)
r = Mongo_thread('mongo', lock)

In [26]:
s.run()
r.run()

Opened Connection
{'event': 'subscribe', 'arg': {'channel': 'candle1m', 'instId': 'DOT-USDT'}}
{'arg': {'channel': 'candle1m', 'instId': 'DOT-USDT'}, 'data': [['1650638820000', '18.214', '18.217', '18.214', '18.217', '6508.707196', '118559.897806']]}
{'event': 'error', 'msg': "channel:candle1m,instId:AVAX-USDT doesn't exist", 'code': '60018'}
{'event': 'subscribe', 'arg': {'channel': 'candle1m', 'instId': 'ADA-USDT'}}
{'arg': {'channel': 'candle1m', 'instId': 'ADA-USDT'}, 'data': [['1650638820000', '0.90366', '0.90425', '0.90366', '0.90425', '317180.6955', '286736.68803368']]}
{'event': 'subscribe', 'arg': {'channel': 'candle1m', 'instId': 'DOGE-USDT'}}
{'arg': {'channel': 'candle1m', 'instId': 'DOGE-USDT'}, 'data': [['1650638820000', '0.135943', '0.135998', '0.135943', '0.135976', '1914222.431149', '260275.431487']]}
{'event': 'subscribe', 'arg': {'channel': 'candle1m', 'instId': 'SOL-USDT'}}
{'arg': {'channel': 'candle1m', 'instId': 'SOL-USDT'}, 'data': [['1650638820000', '100.491', 

KeyboardInterrupt: ignored

In [22]:
intermediate_container =[1,2,3,45]

In [26]:
for elem in intermediate_container:
  del elem

In [30]:
r = intermediate_container.copy()

In [32]:
r

[2, 3, 45]

In [2]:
with open(r"new_", 'w') as f:
  f.write('Good\n')

In [16]:
len(input_container)

0

In [53]:
f.write('sdasd')

ValueError: ignored