# Refactoring exec_pool as a core engine
We are refactoring execution of a contract pool by building a new `async exec_pool` function.

`async exec_pool`:
1. processes sets of contracts to run specific algos 
2. with controlled concurrency 
3. with an option to produce df outputs
   - which provides the capability to checkpoint to a pickle file...
   - ... thereby `re-start` from near a point of failure


In [1]:
MARKET = 'NSE'

In [2]:
import sys
import os
import pathlib
import pandas as pd
import yaml
import asyncio

from ib_insync import IB, util, Option, MarketOrder, Contract
from typing import Callable, Coroutine, Union

In [3]:
# ** LOCAL IMPORTS
from engine import ohlc, chain, qualify, und, Vars

In [4]:
# ** JUPYTER SPECIFIC
# .... Will be ignored in IDE / command-lines
import IPython as ipy
if ipy.get_ipython().__class__.__name__ == 'ZMQInteractiveShell':
    import nest_asyncio
    nest_asyncio.apply()
    util.startLoop()
    pd.options.display.max_columns = None

In [5]:
# ** SET PATHS
cwd = pathlib.Path.cwd() # working directory from where python was initiated

# ...set up data and log path for local (learn)
THIS_FOLDER = '' # ! DUMMY setup for Jupyter. In .py file it is ``os.path.dirname(os.path.abspath(__file__))``
LOGPATH = pathlib.Path.cwd().joinpath(THIS_FOLDER, "data", "log")
DATAPATH = pathlib.Path.cwd().joinpath(THIS_FOLDER, "data", MARKET.lower())

# ...get capability to import programs from `asyncib` folder
IBPATH = cwd.parent.parent.joinpath('asyncib') # where ib programs are stored
if str(IBPATH) not in sys.path:  # Convert it to string!
    sys.path.append(str(IBPATH))
    
IBDATAPATH = IBPATH.joinpath('data', MARKET.lower())

In [6]:
# ** SET VARIABLES & LOGS
ibp = Vars(MARKET.upper())

HOST, PORT, CID = ibp.HOST, ibp.PORT, ibp.CID

LOGFILE = LOGPATH.joinpath(MARKET.lower() + "_base.log")
util.logToFile(path=LOGFILE, level=30)
with open(LOGFILE, "w"):
    pass

# The ``async exec_pool`` algo
### with concurrency control and post-processing checkpoints

In [7]:
# ** EXECUTION

# .make name for symbol being processed by the engine
def make_name(cts):
    """Generates name for contract(s)"""
    try:
        output = [c.symbol + c.lastTradeDateOrContractMonth[-4:] +
                  c.right + str(c.strike) + '..' for c in cts]

    except TypeError as te:  # single non-iterable element
        if cts:  # not empty!
            output = cts.symbol + cts.lastTradeDateOrContractMonth[-4:] + \
                cts.right + str(cts.strike)
        else:
            output = cts

    except AttributeError as ae1:  # multiple (p, s) combination
        try:
            output = [c[0].symbol + c[0].lastTradeDateOrContractMonth[-4:] +
                      c[0].right + str(c[0].strike) + '..' for c in cts]
        except TypeError as ae2:
            output = cts[0].symbol + cts[0].lastTradeDateOrContractMonth[-4:] +\
                cts[0].right + str(cts[0].strike)

    return output

In [8]:
# .the core engine
async def executeAsync(
    ib: IB(),
    algo: Callable[..., Coroutine],  # coro name
    cts: Union[Contract, pd.Series, list, tuple],  # list of contracts
    post_process: Callable[
        [set, pathlib.Path, str], pd.DataFrame
    ] = None,  # If checkpoint is needed
    DATAPATH: pathlib.Path = None,  # Necessary for post_process
    CONCURRENT: int = 40,  # adjust to prevent overflows
    TIMEOUT: None = None,  # if None, no progress messages shown
    OP_FILENAME: str = "",  # output file name
    **kwargs,  # keyword inputs for algo
):

    tasks = set()
    results = set()
    remaining = pre_process(cts)
    last_len_tasks = 0  # tracking last length for error catch

    # Get the results
    while len(remaining):

        # Tasks limited by concurrency
        if len(remaining) <= CONCURRENT:

            tasks.update(
                asyncio.create_task(
                    algo(ib, c, **kwargs), name=make_name(c))
                for c in remaining
            )

        else:

            tasks.update(
                asyncio.create_task(algo(ib, c, **kwargs), name=make_name(c))
                for c in remaining[:CONCURRENT]
            )

        # Execute tasks
        while len(tasks):

            done, tasks = await asyncio.wait(
                tasks, timeout=TIMEOUT, return_when=asyncio.ALL_COMPLETED
            )

            # Remove dones from remaining
            done_names = [d.get_name() for d in done]
            remaining = [c for c in remaining if make_name(
                c) not in done_names]

            # Update results and checkpoint
            results.update(done)

            # Checkpoint the results
            if post_process:
                output = post_process(results, DATAPATH, OP_FILENAME)
            else:
                output = results

            if TIMEOUT:
                if remaining:
                    print(
                        f"\nDone {algo.__name__} for {done_names[:2]} {len(results)} out of {len(cts)}. Pending {[make_name(c) for c in remaining][:2]}"
                    )

                # something wrong. Task is not progressing
                if (len(tasks) == last_len_tasks) & (len(tasks) > 0):
                    print(
                        f"\n @ ALERT @: Tasks are not progressing. Pending tasks will be killed in 5 seconds !\n"
                    )
                    dn, pend = await asyncio.wait(tasks, timeout=5.0)
                    if len(dn) > 0:
                        results.update(dn)

                    tasks.difference_update(dn)
                    tasks.difference_update(pend)

                    pend_names = [p.get_name() for p in pend]
                    # remove pending from remaining
                    remaining = [c for c in remaining
                                 if make_name(c) not in pend_names]

                # re-initialize last length of tasks
                last_len_tasks = len(tasks)

    return output


def save_df(results: set, DATAPATH: pathlib.Path, file_name: str = "") -> pd.DataFrame():

    if results:
        df = pd.concat([r.result() for r in results if r], ignore_index=True)
        if file_name:
            df.to_pickle(DATAPATH.joinpath(file_name))
    else:
        df = pd.DataFrame([])  # results are not yet ready!
    return df

In [59]:
# .preprocessing data for the core engine
def pre_process(cts):
    """Generates tuples for input to the engine"""

    try:
        symbol = cts.symbol
        output = (cts, None),
    
    except AttributeError as ae1: # it's an iterable!
        try:
            symbols = [c.symbol for c in cts]
            
            if len(symbols) == 1:
                output = (cts[0], None),
            else:
                output = ((c, None) for c in cts)
            
        except AttributeError as ae2: # 2nd value is MarketOrder!
            try:
                output = tuple(cts)
            except:
                print(f'Unknown error in {ae2}')
                output = None
                
    return output

# Testing pre_processor

In [66]:
cts = []
cts = Contract(conId=1234)
cts = Contract(conId=1234), MarketOrder('SELL', 3)
cts = [Contract(conId=1234)]
cts = [Contract(conId=1234), Contract(conId=3456)]
cts = [(Contract(conId=1234), MarketOrder('SELL', 3))]
cts = [(Contract(conId=1234), MarketOrder('SELL', 3)), (Contract(conId=3456), MarketOrder('SELL', 3))]
cts = [Contract(conId=1234), MarketOrder('SELL', 3)]
cts = Contract(conId=1234), Contract(conId=3456)

In [67]:
list(pre_process(cts))

[(Contract(conId=1234), None), (Contract(conId=3456), None)]

## Testing `async exec_pool` algo

In [None]:
# Get symlots
df_symlots = pd.read_pickle(DATAPATH.joinpath('df_symlots.pkl'))

und_cts = df_symlots.contract.unique()

In [None]:
# Uncomment for !!! DATA LIMITING underlying contracts
und_cts = list(und_cts[:4])
und_cts

#### Get the OHLCs

In [None]:
%%time
with IB().connect(HOST, PORT, CID) as ib:
    
    # Get the underlyings
    df_ohlcs = ib.run(executeAsync(ib=ib, algo=ohlc, cts=und_cts, 
                                  CONCURRENT=20, TIMEOUT=10.0, 
                                  post_process=save_df, FSPATH=DATAPATH, OP_FILENAME='',
                                  **{'DURATION': 365, 'OHLC_DELAY': 20},
                                  ))

In [None]:
[MarketOrder('SELL', 100)] * 15

### Get underlyings

In [None]:
%%time
with IB().connect(HOST, PORT, CID) as ib:
    
    # Get the underlyings
    df_unds = ib.run(executeAsync(ib=ib, algo=unds, cts=und_cts, 
                                  CONCURRENT=40, TIMEOUT=2.0, 
                                  post_process=save_df, FSPATH=DATAPATH, OP_FILENAME='df_unds.pkl',
                                  **{'FILL_DELAY': 8},
                                  ))

### Make the chains

In [None]:
%%time
# Make the chains
with IB().connect(HOST, PORT, CID) as ib:
    df_chains = ib.run(executeAsync(ib=ib, algo=chains, cts=und_cts,
                                  CONCURRENT=44, TIMEOUT=5,
                                  post_process=save_df, FSPATH=FSPATH, OP_FILENAME='df_chains.pkl',
                                  ))

### Qualify ready-made options

In [None]:
opts = pd.read_pickle(IBDATAPATH.joinpath('df_qopts.pkl'))
opts_cts = [Option(symbol=s, lastTradeDateOrContractMonth=e, 
                   strike=k, right=r, exchange='SMART', conId='') 
            for s, e, k, r, cid in zip(opts.symbol, opts.expiry, opts.strike, opts.right, range(1, len(opts)))]
len(opts_cts)

In [None]:
%%time
with IB().connect(HOST, PORT, CID) as ib:
    df_qopts = ib.run(executeAsync(ib=ib, algo=qualify, cts=opts_cts, 
                                  CONCURRENT=200, TIMEOUT=2.0,
                                  post_process=save_df, FSPATH=FSPATH, OP_FILENAME='',))

### Prepare and qualify fresh set of options (MEGA)
* Run this code if the ENTIRE set of options available in the market for ALL options

In [None]:
## Let us pick assemble the contracts to get our qualifications

# Build the option contracts

puts = [Option(s, e, k, 'P', 'SMART')
        for s, e, k
        in zip(df_chains.symbol, df_chains.expiry, df_chains.strike)]

calls = [Option(s, e, k, 'C', 'SMART')
        for s, e, k
        in zip(df_chains.symbol, df_chains.expiry, df_chains.strike)]

cts = puts + calls

df_cts = util.df(cts).iloc[:, :6].\
            rename(columns={'lastTradeDateOrContractMonth': 'expiry'}).\
                assign(contract=cts)
                
df_cts.conId = None # Replace `conId` as None to track completeness

df_cts = df_cts.sample(len(df_cts)) # !!! TO BE DELETED in live run
df_cts = df_cts.reset_index(drop=True) # Index used to track progress

# *** Done once only in the TEST ***!!!
df_cts.to_pickle(FSPATH.joinpath('all_raw_opts.pkl'))

df_cts = pd.read_pickle(FSPATH.joinpath('all_raw_opts.pkl'))

s = sorted(df_cts.symbol.unique())
print(f"# of symbols: {len(s)}, # of option contracts: {len(df_cts)}, # no of expiries: {len(df_cts.expiry.unique())}")

In [None]:
%%time

fresh_opts = df_cts.contract.sample(500).to_list() # !!! DATA LIMITER - 500 contracts
with IB().connect(HOST, PORT, CID) as ib:
    df_qopts = ib.run(executeAsync(ib=ib, algo=qualify, cts=fresh_opts, 
                                  CONCURRENT=200, TIMEOUT=5.0,
                                  post_process=save_df, FSPATH=FSPATH, OP_FILENAME='df_qualopts1.pkl',))

### Get the price of qualified options

In [None]:
%%time
df_qopts = pd.read_pickle(FSPATH.joinpath('df_qualopts1.pkl'))

with IB().connect(HOST, PORT, CID) as ib:
    df_price = ib.run(executeAsync(ib=ib, algo=prices, cts=df_qopts.contract.to_list(), 
                                  CONCURRENT=200, TIMEOUT=10.0,
                                  post_process=save_df, FSPATH=FSPATH, OP_FILENAME='',))

### Prepare cos for margins from qualified options

In [None]:
df_symlots = pd.read_pickle(IBDATAPATH.joinpath('df_symlots.pkl'))
df_raw_opts = pd.read_pickle(IBDATAPATH.joinpath('df_qopts.pkl'))

if MARKET == 'NSE':
    df_raw_opts['expiryM'] = df_raw_opts.expiry.apply(
        lambda d: d[:4] + '-' + d[4:6])
    cols1 = ['symbol', 'expiryM']
    df_raw_opts = df_raw_opts.set_index(cols1).join(
        df_symlots[cols1 + ['lot']].set_index(cols1)).reset_index()
    df_raw_opts = df_raw_opts.drop('expiryM', 1)
else:
    df_raw_opts['lot'] = 100

# ... build cos (contract, orders)
opts = df_raw_opts.contract.to_list()
orders = [MarketOrder('SELL', lot / lot) if MARKET.upper() ==
          'SNP' else MarketOrder('SELL', lot) for lot in df_raw_opts.lot]
cos = [(c, o) for c, o in zip(opts, orders)]

In [None]:
cos = cos[:500] # !!! DATA LIMITER

### Get option margins

In [None]:
%%time
with IB().connect(HOST, PORT, CID) as ib:
    df_margins = ib.run(executeAsync(ib=ib, algo=margins, cts=cos, 
                                  CONCURRENT=200, TIMEOUT=5.0,
                                  post_process=save_df, FSPATH=FSPATH, OP_FILENAME='df_margins.pkl',))

In [None]:
df_margins

### Get option prices

In [None]:
%%time
with IB().connect(HOST, PORT, CID) as ib:
    df_price = ib.run(executeAsync(ib=ib, algo=prices, cts=opts, 
                                  CONCURRENT=200, TIMEOUT=10.0,
                                  post_process=save_df, FSPATH=FSPATH, OP_FILENAME='df_optprices.pkl',))

#### Verifying data integrity

In [None]:
df = df_price[~df_price.price.isnull()]

In [None]:
df1 = df.set_index('conId').join(df_margins[['conId', 'margin', 'lot', 'comm']].set_index('conId')).reset_index()