In [1]:
%load_ext autoreload
%autoreload 2

In [16]:

# coding: utf-8
import requests
import json
import urllib.parse
import logging
import logging.config

import pandas as pd
import numpy as np
os.environ['INIDIR'] = '/Users/macico/Dropbox/kaggle/kuloko/ini'
os.environ['LOGDIR'] = '/Users/macico/Dropbox/kaggle/kuloko/log'
INIDIR=os.environ['INIDIR'] 
LOGDIR=os.environ['LOGDIR'] 

In [17]:
import configparser

sys.path.append(os.path.join(os.path.dirname('__file__'),'..'))
import  kuloko_handler.handler.api_handler as api
private_api_ini = configparser.ConfigParser()
private_api_ini.read(os.path.join(INIDIR,'private_api.ini'), encoding='utf-8')

general_config_ini = configparser.ConfigParser()
general_config_ini.read(os.path.join(INIDIR,'config.ini'), encoding='utf-8')

logging.config.fileConfig(os.path.join(INIDIR,'logconfig.ini'),defaults={'logfilename': os.path.join(LOGDIR,'logging.log')})
logger = logging.getLogger("KULOKO")

In [25]:
# websocketを使ってtickerをリアルタイム取得
import json
import websocket
from collections import deque
import threading
import time

websocket.enableTrace(True) #trace ON

class Socket(object):
    def __init__(self,channel,logger, general_config_ini,private_api_ini,general_config_mode="DEFAULT",private_api_mode="DEFAULT"):
        self._logger = logger
        if general_config_ini is None:
            general_config_ini = configparser.ConfigParser()
            general_config_ini.read('../ini/config.ini', encoding='utf-8')
            self._logger.info('[DONE]Load General Config.')
        if private_api_ini is None:            
            private_api_ini = configparser.ConfigParser()
            private_api_ini.read('../private_api.ini', encoding='utf-8')
            self._logger.info('[DONE]Load Private API Config.')

        self.load_config( general_config_ini,private_api_ini,general_config_mode,private_api_mode)

        self.channel = channel
        self.load_urls()

    def load_config(self,general_config_ini,private_api_ini,general_config_mode,private_api_mode):
        self.private_api_config = private_api_ini[private_api_mode]
        self.general_config = general_config_ini[general_config_mode]
        self._logger.info('[DONE]Load Config. Private API:[{0}] General:[{1}]'
            .format(private_api_mode,general_config_mode))


    def load_urls(self):
        self.url_parts = {
            'socket_endpoint':self.general_config.get('SOCKET_ENDPOINT_URL'),
            'public':self.general_config.get('SOCKET_PUBLIC_URL'),
            'private':self.general_config.get('SOCKET_PRIVATE_URL'),
        }
        self._logger.info('[DONE]Set URL parts')


    def connect(self,url,sym,maxlen=100):
        self.url = url
        self.maxlen =maxlen
        self.sym = sym
        self.queue = deque([],self.maxlen)
        self.ws = websocket.WebSocketApp(
            url,
            on_message = self.on_message,on_open=self.on_open,
            on_error = self.on_error, on_close = self.on_close)
        logger.info("Socket Connected")
    
    def subscribe(self):
        self.ws.keep_running = True 
        self.thread = threading.Thread(target=lambda: self.ws.run_forever())
        self.thread.daemon = True
        self.thread.start()
        logger.info("Start to subscribe")

    def is_connected(self):
        flag =  self.ws.sock and self.ws.sock.connected
        logger.info("Is connected:{0}".format(flag))

    def disconnect(self):
        self.ws.keep_running = False
        self.ws.close()
        logger.info("Socket closed")

    def get(self):
        queue_len = len(self.queue)
        return [ self.queue.popleft() for _ in range(queue_len)]

    def clean_data(self, return_data):
        for _i in range(return_data):
            return_data[_i]['timestamp'] =return_data[_i]['timestamp']
            return_data[_i]['price'] =return_data[_i]['price']
            return_data[_i]['size'] =return_data[_i]['size']
        return return_data

    def on_message(self, message):
        logger.info('Received:{0}'.format(message))
        self.queue.append(json.loads(message))
        if len(self.queue) > self.maxlen:
            logger.warn("Message queue is full. Old item are discarded")

    def on_error(self, error):
        logger.error('Error. Try reconnect {0}'.format(error),exc_info=True)
        self.disconnect()
        time.sleep(0.5)
        self.connect(self.url , self.sym)

    def on_close(self):
        message = {
            "command": "unsubscribe",
            "channel": self.channel,
            "symbol": self.sym 
        }
        self.ws.send(json.dumps(message))
        logger.info('Websocket disconnected')

    def on_open(self):
        message = {
            "command": "subscribe",
            "channel": self.channel,
            "symbol": self.sym 
        }
        self.ws.send(json.dumps(message))
        logger.info('Socket opened')

class Trade(Socket):
    def __init__(self,logger, eneral_config_ini,private_api_ini):
        super().__init__("trades",logger, general_config_ini,private_api_ini)
    
    def get_url(self):
        path = os.path.join(self.url_parts['socket_endpoint'],self.url_parts['public'])
        return path

class Orderbooks(Socket):
    def __init__(self,logger, eneral_config_ini,private_api_ini):
        super().__init__("orderbooks",logger, general_config_ini,private_api_ini)
    
    def get_url(self):
        path = os.path.join(self.url_parts['socket_endpoint'],self.url_parts['public'])
        return path

class Ticker(Socket):
    def __init__(self,logger, eneral_config_ini,private_api_ini):
        super().__init__("ticker",logger, general_config_ini,private_api_ini)
    
    def get_url(self):
        path = os.path.join(self.url_parts['socket_endpoint'],self.url_parts['public'])
        return path





# trade = Trade(')
    # while True:
    #     print(bfrt.get())
    #     time.sleep(0.5)_

In [26]:
trade = Trade(logger, general_config_ini,private_api_ini)

In [27]:
trade.connect(trade.get_url(),'BTC')

In [28]:
trade.subscribe()


In [29]:
trade.get()

[]

In [30]:
trade.disconnect()

In [31]:
orderbooks = Orderbooks(logger, general_config_ini,private_api_ini)

In [32]:
orderbooks.connect(orderbooks.get_url(),'BTC')

In [34]:
orderbooks.subscribe()

In [37]:
orderbooks.disconnect()

In [39]:
ticker = Ticker(logger, general_config_ini,private_api_ini)

In [40]:
ticker.connect(ticker.get_url(),'BTC')

In [41]:
ticker.subscribe()

In [43]:
ticker.get()

[]

In [45]:
ticker.disconnect()