In [3]:
# File collects data for a machine(deep) learning model to make prediction of next price movement
import asyncio
import numpy as np
import time
import tinvest as ti
import pandas as pd
# Tinkoff info and initial parameters
TOKEN_SANDBOX =  # Your sandbox token
client = ti.SyncClient(TOKEN_SANDBOX, use_sandbox=True)
FIGI = client.get_market_search_by_ticker('MRNA').payload.instruments[0].figi

In [5]:
candle_resolution = ti.CandleResolution.min1
duration = 3600*10
sample_time = 60
period = 3
depth = 20

# Streaming coroutine. Makes live changing pool of the last values of candles and orderbooks
async def stream(token,figi,collect_duration,candle_resolution,depth,countdown=60-time.gmtime().tm_sec-time.time()%1):
    global cur_events
    cur_events = np.zeros((3),dtype=object)
    start_time = time.time()
    async with ti.Streaming(token) as streaming:
        try:
            await streaming.candle.subscribe(figi, candle_resolution)
            await streaming.orderbook.subscribe(figi, depth)
            await streaming.instrument_info.subscribe(figi)
            async for event in streaming:
                #print(event)
                if str(event.event)=='Event.candle':
                    cur_events[0] = event
                elif str(event.event)=='Event.orderbook':
                    cur_events[1] = event
                elif str(event.event)=='Event.instrument_info':
                    cur_events[2] = event
                if (time.time() - start_time) >= (collect_duration+countdown+5):
                    await streaming.stop()
        except asyncio.TimeoutError:
            print('Stream stopped!')

# Collecting coroutine. Makes collecting data from current events provided by concurrent streaming
async def collect(duration,depth,sample_time,period,countdown=60-time.gmtime().tm_sec-time.time()%1):
    if sample_time%period==0:
        global X,gmtimes
        X,gmtimes=[],[]
        await asyncio.sleep(countdown)
        start_time = time.time()
        current = 0
        sample_num = 0
        parts_number = int(round(sample_time/period))
        sample = np.zeros(((2*depth+1)*parts_number))
        part_vars = []
        for i in range(parts_number):
            locals()['part' + str(i)] = i
            part_vars.append(locals()['part' + str(i)])
        while time.time() <= start_time + duration:
            if (cur_events[2].payload.trade_status=='normal_trading'):
                if current!=0:
                    last = round(current)
                else:
                    last = time.time() - period
                gmtime = time.gmtime()
                current = time.time()
                if round(gmtime.tm_sec+current%1)==60:
                    part_num=0
                else:
                    part_num = int(round(gmtime.tm_sec+current%1)//period)
                #print('{}:{}:{:.2f}'.format(gmtime.tm_hour,gmtime.tm_min,gmtime.tm_sec+time.time()%1))
                # sample[parts_number+(2*depth)*part_num:parts_number+(2*depth)*part_num+depth] = np.array(cur_events[1].payload.bids)[:,1]
                # sample[parts_number+(2*depth)*part_num+depth:parts_number+(2*depth)*part_num+2*depth] = np.array(cur_events[1].payload.asks)[:,1]
                # sample[part_num] = cur_events[0].payload.c
                part_vars[part_num] = np.hstack([np.array(cur_events[0].payload.c),np.array(cur_events[1].payload.bids)[:,1],np.array(cur_events[1].payload.asks)[:,1]])
                if (part_num==int((parts_number-1))):
                    sample = np.hstack(part_vars)
                    if len(sample)==(2*depth+1)*parts_number:
                        gmtimes.append(gmtime)
                        X.append(sample)
                        print('Sample #{} added'.format(sample_num))
                        sample_num+=1
            else:
                print('Not a good time for trading')
            await asyncio.sleep(period-(current-last-period))
        print('Collection complete!')
    else:
        print('Sample time must be a multiple of the period')

# Coroutine that runs concurrent coroutines
async def gather_tasks(token,figi,collect_duration,candle_resolution,depth,sample_time,period):
    tasks = [asyncio.create_task(stream(token,figi,collect_duration,candle_resolution,depth)),
             asyncio.create_task(collect(collect_duration,depth,sample_time,period))
            ]
    await asyncio.gather(*tasks)

# Run event loop with concurrent tasks
#loop = asyncio.get_event_loop()
#loop.run_until_complete(gather_tasks(TOKEN_SANDBOX,FIGI,duration,candle_resolution,depth,sample_time,period))
#loop.close()
await gather_tasks(TOKEN_SANDBOX,FIGI,duration,candle_resolution,depth,sample_time,period)

# Make csv with collected data
np_X = np.array(X)
df_X = pd.DataFrame(data=np_X,index=gmtimes)
#print(df_X)
df_X.to_csv('df_X_MRNA.csv')

Sample #0 added
Sample #1 added
Sample #2 added
Sample #3 added
Sample #4 added
Sample #5 added
Sample #6 added
Collection complete!


In [8]:
import datetime as dt

In [10]:
np_X = np.array(X)
gmtimes = [pd.to_datetime('10:33:00 04-05-2021')+i*dt.timedelta(minutes=1) for i in range(662)]
df_X = pd.DataFrame(data=np_X,index=gmtimes[:len(X)])
df_X.to_csv('df_X_SPCE.csv')