# Async practice

In [20]:
import asyncio
import nest_asyncio
nest_asyncio.apply()
import time

from binance import AsyncClient

In [7]:
async def main():

    client = await AsyncClient.create()
    res = await asyncio.gather(
        client.get_exchange_info(),
        client.get_all_tickers()
    )

if __name__ == "__main__":

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fe90df614c0>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7fe90e070d60>, 116.100399893), (<aiohttp.client_proto.ResponseHandler object at 0x7fe90f03d580>, 116.127259999)]']
connector: <aiohttp.connector.TCPConnector object at 0x7fe90df610a0>


In [14]:
import asyncio
import json

from binance import AsyncClient, DepthCacheManager, BinanceSocketManager

In [18]:
async def main():

    # initialise the client
    client = await AsyncClient.create()

    # run some simple requests
    print(json.dumps(await client.get_exchange_info(), indent=2))

    print(json.dumps(await client.get_symbol_ticker(symbol="BTCUSDT"), indent=2))

    # initialise websocket factory manager
    bsm = BinanceSocketManager(client)

    # create listener using async with
    # this will exit and close the connection after 5 messages
    async with bsm.trade_socket('ETHBTC') as ts:
        for _ in range(5):
            res = await ts.recv()
            print(f'recv {res}')

    # get historical kline data from any date range

    # fetch 1 minute klines for the last day up until now
#     klines = client.get_historical_klines("BNBBTC", AsyncClient.KLINE_INTERVAL_1MINUTE, "1 day ago UTC")

    # use generator to fetch 1 minute klines for the last day up until now
#     async for kline in await client.get_historical_klines_generator("BNBBTC", AsyncClient.KLINE_INTERVAL_1MINUTE, "1 day ago UTC"):
#         print(kline)

    # fetch 30 minute klines for the last month of 2017
#     klines = client.get_historical_klines("ETHBTC", Client.KLINE_INTERVAL_30MINUTE, "1 Dec, 2017", "1 Jan, 2018")

    # fetch weekly klines since it listed
    klines = client.get_historical_klines("NEOBTC", client.KLINE_INTERVAL_1WEEK, "1 Jan, 2017")

    # setup an async context the Depth Cache and exit after 5 messages
    async with DepthCacheManager(client, symbol='ETHBTC') as dcm_socket:
        for _ in range(5):
            depth_cache = await dcm_socket.recv()
            print(f"symbol {depth_cache.symbol} updated:{depth_cache.update_time}")
            print("Top 5 asks:")
            print(depth_cache.get_asks()[:5])
            print("Top 5 bids:")
            print(depth_cache.get_bids()[:5])

    # Vanilla options Depth Cache works the same, update the symbol to a current one
    options_symbol = 'BTC-210430-36000-C'
    async with OptionsDepthCacheManager(client, symbol=options_symbol) as dcm_socket:
        for _ in range(5):
            depth_cache = await dcm_socket.recv()
            count += 1
            print(f"symbol {depth_cache.symbol} updated:{depth_cache.update_time}")
            print("Top 5 asks:")
            print(depth_cache.get_asks()[:5])
            print("Top 5 bids:")
            print(depth_cache.get_bids()[:5])

    await client.close_connection()

In [None]:
if __name__ == "__main__":

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

In [37]:
import pandas as pd

ERROR:asyncio:Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fe90dfd54c0>
ERROR:asyncio:Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fe90dc59550>
  from pandas.core.groupby.groupby import (


In [58]:
async def test(x):
#     return pd.read_csv(f"../data/minute/{x}-minute.csv")
    time.sleep(x)
    return 1

In [36]:
pd.read_csv("../data/minute/AIONUSDT-minute.csv")

NameError: name 'pd' is not defined

In [60]:
start = time.time()
await test(2)
await test(2)
await test(2)
time.time()-start

6.01359486579895

In [61]:
start = time.time()
async def test2():
    res = await asyncio.gather(
        asyncio.create_task(test(2)),
        asyncio.create_task(test(2)),
        asyncio.create_task(test(2)),
    )
    return res
time.time() - start

0.0006070137023925781

In [62]:
start = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(test2())
time.time() - start

6.014945983886719

In [31]:
res

[10, 11]

In [28]:
start = time.time()
y = await test(5) + await test(5)
time.time() - start

6.010514974594116

In [29]:
y

20

In [94]:
start = time.time()
import asyncio
a = 0
async def task(name, work_queue):
    global a
    while not work_queue.empty():
        num = await work_queue.get()
        print(f"add {num} to a:{a}")
        await asyncio.sleep(num)
        a += num
        print(f"a is now {a}")

async def main():
    """
    This is the main entry point for the program
    """
    global a
    work_queue = asyncio.Queue()

    for work in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]:
        await work_queue.put(work)

    await asyncio.gather(
        asyncio.create_task(task("One", work_queue)),
        asyncio.create_task(task("Two", work_queue)),
        asyncio.create_task(task("Three", work_queue)),
        asyncio.create_task(task("One", work_queue)),
        asyncio.create_task(task("Two", work_queue)),
        asyncio.create_task(task("Three", work_queue)),
    )
    
    print(f"a is {a}, now add 100")
    a += 100
    print(f"a is {a}")

if __name__ == "__main__":
    asyncio.run(main())
    
print(f"It took {time.time() - start} seconds")

add 1 to a:0
add 2 to a:0
add 3 to a:0
add 4 to a:0
add 5 to a:0
add 6 to a:0
a is now 1
add 7 to a:1
a is now 3
add 8 to a:3
a is now 6
add 9 to a:6
a is now 10
add 10 to a:10
a is now 15
a is now 21
a is now 28
a is now 36
a is now 45
a is now 55
a is 55, now add 100
a is 155
It took 14.012741088867188 seconds


In [92]:
a

55

In [98]:
start = time.time()
import asyncio
async def task(name, work_queue):
    global a
    while not work_queue.empty():
        name = await work_queue.get()
        print(f"reading {name}")
        await pd.read_csv(f"../data/minute/{name}-minute.csv")
        print(f"read {name}")

async def main():
    """
    This is the main entry point for the program
    """
    global a
    work_queue = asyncio.Queue()

    for work in ["AIONUSDT", "ARDRUSDT", "ANKRUSDT", "BATUSDT"]:
        await work_queue.put(work)

    await asyncio.gather(
        asyncio.create_task(task("One", work_queue)),
        asyncio.create_task(task("Two", work_queue)),
    )

if __name__ == "__main__":
    asyncio.run(main())
    
print(f"It took {time.time() - start} seconds")

reading AIONUSDT
reading ARDRUSDT


TypeError: object DataFrame can't be used in 'await' expression