In [1]:
# x.py
from helper import *
from nse_func import *

import nest_asyncio
nest_asyncio.apply()

# Do assignments
a = assign_var('nse')
for v in a:
    exec(v)

from ib_insync import *
util.startLoop()

# ib =  get_connected('nse', 'live')

with open(logpath+'ztest.log', 'w'):
    pass # clear the run log

util.logToFile(logpath+'ztest.log')

In [2]:
# get_lots.py
def get_lots():
    '''Get the lots with expiry dates
    Arg: None
    Returns: lots dataframe with expiry as YYYYMM''' 

    url = 'https://www.nseindia.com/content/fo/fo_mktlots.csv'
    req = requests.get(url)
    data = StringIO(req.text)
    lots_df = pd.read_csv(data)

    lots_df = lots_df[list(lots_df)[1:5]]

    # strip whitespace from columns and make it lower case
    lots_df.columns = lots_df.columns.str.strip().str.lower() 

    # strip all string contents of whitespaces
    lots_df = lots_df.applymap(lambda x: x.strip() if type(x) is str else x)

    # remove 'Symbol' row
    lots_df = lots_df[lots_df.symbol != 'Symbol']

    # melt the expiries into rows
    lots_df = lots_df.melt(id_vars=['symbol'], var_name='expiryM', value_name='lot').dropna()

    # remove rows without lots
    lots_df = lots_df[~(lots_df.lot == '')]

    # convert expiry to period
    lots_df = lots_df.assign(expiryM=pd.to_datetime(lots_df.expiryM, format='%b-%y').dt.to_period('M'))

    # convert lots to integers
    lots_df = lots_df.assign(lot=pd.to_numeric(lots_df.lot, errors='coerce'))
    
    # convert & to %26
    lots_df = lots_df.assign(symbol=lots_df.symbol.str.replace('&', '%26'))

    return lots_df.reset_index(drop=True)

In [3]:
%%time
# symbols_and_lots.py
# get the symbols and lots
df_lots = get_lots()
symbols = sorted(list(df_lots.symbol.unique()))

Wall time: 1.24 s


In [5]:
url = 'https://www.nseindia.com/live_market/dynaContent/live_watch/option_chain/optionKeys.jsp?symbol='
url_list = [url+symbol for symbol in symbols]

In [10]:
import ssl
async def fetch(session, url):
    async with session.get(url, ssl=ssl.SSLContext()) as response:
        await asyncio.sleep(0.1)
        return await response.text()


async def fetch_all(urls, loop):
    async with aiohttp.ClientSession(loop=loop) as session:
        results = await asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)
        return results


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    urls = url_list
    htmls = loop.run_until_complete(fetch_all(urls, loop))
    print(htmls)

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)






In [None]:
# generates the coroutines
tasks = [asyncio.create_task(get_xu_async(s)) 
         for s in symbols]

# run the coroutines
dfs = asyncio.run(df_when_done(tasks))

In [2]:
# get_xu_async.py
async def get_xu_async(symbol: str) -> pd.DataFrame():
    '''Gets the symbol, expiry, undPrice'''
    # get expiries for the symbol
    url = 'https://www.nseindia.com/live_market/dynaContent/live_watch/option_chain/optionKeys.jsp?symbol='
    xpd = "//*[@id='date']" # xpath for date select options
    xpu = "//*[@id='whttp://localhost:8888/notebooks/bin/z_nse_scrape_async.ipynb#rapper_btm']/table[1]/tr/td[2]/div/span[1]/b" # xpath for undPrice

    print(f'Started aiohttp for {symbol}')
    
    async with aiohttp.ClientSession(headers=headers) as session:
        async with session.get(url+symbol) as resp:
            assert resp.status == 200
            res = await resp.text()
            print(f'Completed aiohttp for {symbol}')
            
            return res

async def df_when_done(tasks):
    from lxml import html
    dfs = []
    for res in limited_as_completed(tasks, 20):
        htree = await html.fromstring(res) #html is from lxml 
        expiries = [opt.text for e in htree.xpath(xpd) for opt in e if 'Select' not in opt.text.strip('')]
        undPrice = [float(e.text.split(' ')[1]) for e in htree.xpath(xpu)][0]

        # convert above to a DataFrame
        df = pd.DataFrame(list(product([symbol], expiries, [str(undPrice)])), 
                          columns=['symbol', 'expiry', 'undPrice'])
        df = df.apply(pd.to_numeric, errors = 'ignore')
        print('df done!')
        dfs.append(df)
    return dfs

import asyncio
from itertools import islice

def limited_as_completed(coros, limit):
    """
    Run the coroutines (or futures) supplied in the
    iterable coros, ensuring that there are at most
    limit coroutines running at any time.
    Return an iterator whose values, when waited for,
    are Future instances containing the results of
    the coroutines.
    Results may be provided in any order, as they
    become available.
    """
    futures = [
        asyncio.ensure_future(c)
        for c in islice(coros, 0, limit)
    ]
    async def first_to_finish():
        while True:
            await asyncio.sleep(0)
            for f in futures:
                if f.done():
                    futures.remove(f)
                    try:
                        newf = next(coros)
                        futures.append(
                            asyncio.ensure_future(newf))
                    except StopIteration as e:
                        pass
                    return f.result()
    while len(futures) > 0:
        yield first_to_finish()

In [None]:
dfs

In [None]:
%%time
# run just one get_xu_async - with run_until_complete
loop = asyncio.get_event_loop()
htmls=loop.run_until_complete(get_xu_async('ACC'))

In [None]:
%%time
# run all scrips - with gather
tasks = [asyncio.create_task(get_xu_async(s)) for s in symbols]
g = asyncio.gather(*tasks)

htmls = asyncio.run(g)

In [None]:
len(htmls)

In [None]:
%%time
# run two scrips - with gather
sym2 = ['ACC', 'PNB']
tasks = [asyncio.create_task(get_xu_async(s)) for s in sym2]
g = asyncio.gather(*tasks)

htmls = asyncio.run(g)

In [None]:
async def main():
#     tasks = [asyncio.create_task(get_xu_async(s)) for s in symbols]

    # Schedule all the tasks *concurrently*
    await asyncio.gather(*[asyncio.create_task(get_xu_async(s)) for s in symbols])
    print(f"started at {time.strftime('%X')}")

dfs = asyncio.run(main())

In [None]:
async def fetch(client, url):
    async with client.get(url) as resp:
        assert resp.status == 200
        return await resp.text()

async def main(urls):
    async with aiohttp.ClientSession(headers=headers) as client:
        tasks = await [fetch(client, u) for u in urls]
        return html
        
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

In [6]:
#!/usr/local/bin/python3.5
import asyncio
from aiohttp import ClientSession

async def fetch(url, session):
    async with session.get(url) as response:
        return await response.read()

async def run(r):
    url = 'https://www.nseindia.com/live_market/dynaContent/live_watch/option_chain/optionKeys.jsp?symbol={}'
    tasks = []

    # Fetch all responses within one Client session,
    # keep connection alive for all requests.
    async with ClientSession() as session:
        for i in symbols[:10]:
            task = asyncio.ensure_future(fetch(url.format(i), session))
            tasks.append(task)

        responses = await asyncio.gather(*tasks)
        # you now have all response bodies in this variable
        return responses
#         print(responses)

def print_responses(result):
    print(result)

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(4))
loop.run_until_complete(future)

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)




