In [8]:
import websocket
import json 
import logging
from datetime import datetime
import csv


class CryptoExtractSchedule:
    def __init__(self,streams,interval,output_path):
        self.streams=streams
        self.interval=interval
        self.output_path= output_path
        self.json_data_collection=[]

    def write_to_csv(self):
        print(f'Data collection Value = {self.json_data_collection}')###

        if((not self.json_data_collection) and len(self.json_data_collection)==0):
            return

        dt_string = datetime.now().strftime("%d-%m-%Y %H:%M:%S").replace(" ", "").replace(':','-')
        final_output_path= self.output_path+'cryptoData'+dt_string+'.csv'
        data_file = open(final_output_path, 'w', newline='')
        csv_writer = csv.writer(data_file)
        count = 0
        for data in self.json_data_collection:
            if count == 0:
                header = data.keys()
                csv_writer.writerow(header)
                count += 1
            csv_writer.writerow(data.values()) 
        data_file.close()
        #reset value
        self.json_data_collection=[]

    def on_open(self,ws):
        print("Binance connected ...")
        self.prev_date=datetime.now()
        try:
            subscribe = {"method":"SUBSCRIBE","params":self.streams,"id":1}
            ws.send(json.dumps(subscribe))
        except Exception as e:
            logging.ERROR(e)
            print(e)

    def on_message(self,ws, message):
        json_message = json.loads(message)
        is_candle_closed=False
        if(json_message and  'k' in json_message):
            candle = json_message['k']
            is_candle_closed= candle['x']
        if(is_candle_closed):
            print(f'inisde is candle closed with candle value = {candle}' )### inside isCandle closed
            self.json_data_collection.append(candle)

        #custom sceduling, inserting data to csv every 10 mins
        self.cur_date = datetime.now()
        if(((self.cur_date - self.prev_date).seconds)/60 >=10):
            self.write_to_csv()
            self.prev_date=self.cur_date

        # print(self.json_data_collection)
        #after every 10 minutes dump the data to a csv file
        #schedule.every(1).minutes.do(self.write_to_csv)
        

    def on_error(self,ws, error):
        ###logging.ERROR(error)
        print(error)

    def on_close(self,ws):
        print("Connection Closed!")

    # def get_socket(self):
    #     socket = 'wss://stream.binance.com:9443/ws'
    #     self.ws = websocket.WebSocketApp(socket,on_open= lambda ws,msg: self.on_open(ws),
    #                                      on_close= lambda ws: self.on_close(ws), 
    #                                      on_message = lambda ws,msg: self.on_message(ws,msg), 
    #                                      on_error= lambda ws,msg: self.on_error(ws,msg))
    #     return self.ws

    def run(self):
        socket = 'wss://data-stream.binance.com/ws'
        self.ws = websocket.WebSocketApp(socket,on_open= self.on_open,
                                         on_close= self.on_close, 
                                         on_message = self.on_message, 
                                         on_error= self.on_error)
        self.ws.run_forever()


interval = '1m'
streams = [f"ethusdt@kline_{interval}",f"btcusdt@kline_{interval}",f"xrpusdt@kline_{interval}",f"bnbusdt@kline_{interval}",f"dogeusdt@kline_{interval}",f"adausdt@kline_{interval}",f"maticusdt@kline_{interval}",
f"dotusdt@kline_{interval}",f"solusdt@kline_{interval}",f"usdcusdt@kline_{interval}"]

#TODO : FIX logging
logging.basicConfig(filename='GetCryptoData.log', level=logging.ERROR, format='%(asctime)s:%(levelname)s:%(message)s')

file_path = '/Users/pratyushpradhan/Developer/Personal/Projects/Crypto/RealTimeData/'
obj = CryptoExtractSchedule(streams,interval,file_path)
temp = obj.run()

Binance connected ...
inisde is candle closed with candle value = {'t': 1670354760000, 'T': 1670354819999, 's': 'XRPUSDT', 'i': '1m', 'f': 492378736, 'L': 492378787, 'o': '0.38910000', 'c': '0.38930000', 'h': '0.38930000', 'l': '0.38910000', 'v': '66603.00000000', 'n': 52, 'x': True, 'q': '25921.52200000', 'V': '47942.00000000', 'Q': '18659.63050000', 'B': '0'}
inisde is candle closed with candle value = {'t': 1670354760000, 'T': 1670354819999, 's': 'BTCUSDT', 'i': '1m', 'f': 2296318490, 'L': 2296321021, 'o': '16949.98000000', 'c': '16958.91000000', 'h': '16960.48000000', 'l': '16949.59000000', 'v': '101.49770000', 'n': 2532, 'x': True, 'q': '1720961.56041670', 'V': '53.76396000', 'Q': '911617.62360410', 'B': '0'}
inisde is candle closed with candle value = {'t': 1670354760000, 'T': 1670354819999, 's': 'ETHUSDT', 'i': '1m', 'f': 1037951158, 'L': 1037951394, 'o': '1246.41000000', 'c': '1247.40000000', 'h': '1247.41000000', 'l': '1246.41000000', 'v': '165.74510000', 'n': 237, 'x': True, 

In [2]:
# !pip3 install websocket


Collecting websocket
  Using cached websocket-0.2.1-py3-none-any.whl
Collecting greenlet
  Using cached greenlet-2.0.1-cp310-cp310-macosx_11_0_arm64.whl
Collecting gevent
  Using cached gevent-22.10.2.tar.gz (6.6 MB)
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Installing backend dependencies ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hCollecting zope.event
  Using cached zope.event-4.5.0-py2.py3-none-any.whl (6.8 kB)
Collecting zope.interface
  Using cached zope.interface-5.5.2-cp310-cp310-macosx_11_0_arm64.whl (210 kB)
Building wheels for collected packages: gevent
  Building wheel for gevent (pyproject.toml) ... [?25ldone
[?25h  Created wheel for gevent: filename=gevent-22.10.2-cp310-cp310-macosx_11_0_arm64.whl size=1711997 sha256=1f705d4834597756229b567b511a23d8f07964b68b7031d99b5e44a4056eabcd
  Stored in directory: /Users/pratyushpradhan/Library/Caches/pip/wheels/a4/de/5d/49a