## Setup for Websocket Connection (Binance & Coinbase Pro)

In [None]:
import asyncio
from datetime import datetime, timedelta
import sys
#from binance.client import AsyncClient
# from binance import BinanceSocketManager, ThreadedWebsocketManager
import configparser
import utils
from websocket import create_connection
import websockets

import nest_asyncio
nest_asyncio.apply()

# BINANCE_SYMBOL = 'btcusdt'
# COINBASE_SYMBOL = 'BTC-USD'
BINANCE_SUBSCRIBE = '{ "method": "SUBSCRIBE", "params": [ "btcusdt@kline_1M"], "id": 1 }'
COINBASE_SUBSCRIBE = '{ "type": "subscribe", "product_ids": ["BTC-USD"], "channels": [ "ticker"] }'

config = configparser.ConfigParser()
config.read('../config.ini')

B_WSS = config['BINANCE']['WS_URL']
C_WSS = config['COINBASE']['WS_URL']

## Websocket Testing for Binance (1)

In [None]:
ws = create_connection(B_WSS)
ws.send(BINANCE_SUBSCRIBE)

# Infinite loop waiting for WebSocket data
while True:
    print(ws.recv())



## Websocket Testing for Binance (2)

In [None]:
connection = websockets.connect(uri=B_WSS)

async with connection as websocket:
    # Sends a message.
    await websocket.send(BINANCE_SUBSCRIBE)

    # Receives the replies.
    async for message in websocket:
        print(message)
        
    # Closes the connection.
    await websocket.close()


## Websocket Testing for Coinbase Pro (1)

In [None]:
# Connect to WebSocket API and subscribe to trade feed for XBT/USD and XRP/USD
connection = websockets.connect(uri=C_WSS)

async with connection as websocket:
    # Sends a message.
    await websocket.send(COINBASE_SUBSCRIBE)

    # Receives the replies.
    async for message in websocket:
        print(message)
        
    # Closes the connection.
    await websocket.close()

## Websocket Testing for Coinbase Pro (2)

In [None]:
async def work(uri, subscribe):

    connection = websockets.connect(uri=uri)

    async with connection as websocket:
        # Sends a message.
        # await websocket.send('{ "type": "subscribe", "product_ids": ["BTC-USD"], "channels": [ "ticker"] }')
        await websocket.send(subscribe)

        # Receives the replies.
        async for message in websocket:
            print(message)
        
        # Closes the connection.
        await websocket.close()


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(work(C_WSS, COINBASE_SUBSCRIBE))
finally:
    loop.close()

## Multi Websocket Connection Testing for Binance & Coinbase Pro

In [None]:
import utils 

from collections import deque 
MAXLEN = 100 

price_data = deque(maxlen=MAXLEN)

urls = [B_WSS, C_WSS]
sub_texts = [BINANCE_SUBSCRIBE, COINBASE_SUBSCRIBE]
import json 

data = []

async def socket_communication(url, sub_text):
    connection = websockets.connect(uri=url)

    async with connection as websocket:
        # Sends a message.
        await websocket.send(sub_text)
        print(f'connected to {url}')

        copro_price = 0.
        binance_price = 0. 
        # Receives the replies.
        async for message in websocket:
            m = json.loads(message)
            if 'e' in m:
                
                binance_minute = utils.binance_ts_to_dt(m['E']).minute 
                binance_hour = utils.binance_ts_to_dt(m['E']).hour
                binance_price = m['k']['c']
                # print('binance: ', utils.binance_ts_to_dt(m['E']))
                print(f'binance: {binance_hour} {binance_minute} price {binance_price}')
            # if 'type' in m:
            if 'type' in m and m['type'] == 'ticker':
                copro_time = datetime.strptime(m['time'].split(".")[0], utils.COPRO_DATETIME_FORMAT)
                copro_minute = copro_time.minute
                copro_hour = copro_time.hour
                copro_price = m['price']
                # print('copro: ', m['time'].split(".")[0])
                print(f'copro: {copro_hour} {copro_minute} price {copro_price}')

            # if ((copro_price != 0) and (binance_price != 0)):
            # if ('copro_price' in locals() and 'binance_price' in locals):
            # if ((copro_price != 0) and (binance_price != 0)):
                gap = float(copro_price) - float(binance_price)
                # print(f'price gap: {gap}: {copro_price} - {binance_price}')

                # print('copro', m['time'].replace('T', ' '), m['price'])
                # print('copro', m['time'].replace('T', ' '), m['price'])

            # if message['e'] == 'kline':
            #     print(message['E'])
            await asyncio.sleep(0)
        
        # Closes the connection.
        await websocket.close()


# async def processing():
#     while True: 
#         asyncio.sleep(0)
#         print('Do something')


async def main():
    tasks = []

    for i in range(2):
        tasks.append(socket_communication(urls[i],sub_texts[i]))
    # tasks.append(processing())

    print(tasks)
    # print(tasks)
    await asyncio.gather(*tasks)

# con_binance = websockets.connect(uri=urls[0])
# con_copro = websockets.connect(uri=urls[1])





asyncio.run(main())

In [None]:
urls = [B_WSS, C_WSS]
sub_texts = [BINANCE_SUBSCRIBE, COINBASE_SUBSCRIBE]


async def socket_communication(url, sub_text):
    connection = websockets.connect(uri=url)

    async with connection as websocket:
        # Sends a message.
        await websocket.send(sub_text)
        print(f'connected to {url}')

        # Receives the replies.
        async for message in websocket:
            print(url, message)
            await asyncio.sleep(0)
        
        # Closes the connection.
        await websocket.close()


async def processing():
    asyncio.sleep(0)
    print('Do something')


async def main():
    tasks = []

    for i in range(2):
        tasks.append(socket_communication(urls[i],sub_texts[i]))
    tasks.append(processing())

    print(tasks)
    # print(tasks)
    await asyncio.gather(*tasks)

con_binance = websockets.connect(uri=urls[0])
con_copro = websockets.connect(uri=urls[1])

# async with con_binance as websocket_binance:
await con_binance.send(sub_texts[0])
print(f'connected to {url}')

# async with con_copro as websocket_copro:
await con_copro.send(sub_texts[1])
print(f'connected to {url}')

In [None]:
import websocket

def on_message(ws, message):
    print(message)

def on_error(ws, error):
    print(error)

def on_close(ws):
    print("### closed ###")

def on_open(ws):
    print('Connected to websocket')
    params = BINANCE_SUBSCRIBE
    ws.send(params)

if __name__ == "__main__":
    ws = websocket.WebSocketApp(
        url="wss://stream.binance.com:9443/ws",
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
        on_open=on_open
    )

    ws.run_forever()

In [1]:
import asyncio
from datetime import datetime, timedelta
import sys
#from binance.client import AsyncClient
# from binance import BinanceSocketManager, ThreadedWebsocketManager
import configparser
import utils
from websocket import create_connection
import websockets

import nest_asyncio
nest_asyncio.apply()

# BINANCE_SYMBOL = 'btcusdt'
# COINBASE_SYMBOL = 'BTC-USD'
BINANCE_SUBSCRIBE = '{ "method": "SUBSCRIBE", "params": [ "btcusdt@kline_1M"], "id": 1 }'
COINBASE_SUBSCRIBE = '{ "type": "subscribe", "product_ids": ["BTC-USD"], "channels": [ "ticker"] }'

config = configparser.ConfigParser()
config.read('../config.ini')

B_WSS = config['BINANCE']['WS_URL']
C_WSS = config['COINBASE']['WS_URL']
from client import Binance, Coinbase

binance = Binance(B_WSS, 'binance')
copro = Coinbase(B_WSS, 'copro')

binance.start()
copro.start()