### TODO

1. lambda functions to create quote and order notification messages
2. implement threads for handling live streaming
  - Design json message structure for each handler
  - Implement decoder for json messages for each handler
3. Implement trade job - init, scanner, order
4. Implement user request handler


#### Future
1. Define function to restart a stopped thread
2. Watchdog implementation to resume processes
3. Implementation of user initiated aborts and restart

## Thread functions

In [1]:
from lib.multitasking_lib import *
from lib.kite_helper_lib import *
import sys

## Definition of handler functions

In [2]:
# A thread function to process notifications and tick
no_of_hist_candles = 100

def msg_to_ohlc(data):
    ohlc_list = list(data.values())[0]['ohlc']
    sdate = ohlc_list['date']
    shigh = ohlc_list['high']
    slow = ohlc_list['low']
    sopen = ohlc_list['open']
    sclose = ohlc_list['close']
    svolume = ohlc_list['volume']

    temp_df = pd.DataFrame(ohlc_list, columns=['open','high','low','close','volume'], 
                           index=[datetime.strptime(ohlc_list['date'],'%Y-%m-%d')])
    
    return temp_df

trade_lock = Lock()
def trade_job(msg):
    pdebug('trade_job: {}'.format(msg))
    
    data = json.loads(msg)
    
    # Step 1.1: Get stock name from the message
    for key in data.keys():
        stock = key.split(':')[1]
        exchange = key.split(':')[0]
    
    hash_key = stock+'_state'
    
    trade_lock.acquire()
    # Step 1.2: Get state for the stock from the redis
    state = conn.hget(hash_key,'state')
    if not state:
        state = 'INIT'
        conn.hmset(hash_key, {'state':state,'stock':stock, 'qty':0,'price':0,'algo':'','freq':'day','so':0,'target':0})
    
    freq = conn.hget(hash_key,'freq')
    pinfo("{}: {}: {}".format(stock,state, data))
    # Step 2: Switch to appropriate state machine based on current state
    if state == 'INIT': # State: Init
        # 1: Populate Redis buffer stock+"OHLCBuffer" with historical data
        temp_df = msg_to_ohlc(data)
        
        toDate = (temp_df.index[0] - timedelta(days=1)).strftime('%Y-%m-%d')
        fromDate = (temp_df.index[0] - timedelta(days=no_of_hist_candles)).strftime('%Y-%m-%d')
        
        ohlc_data = getData(stock, fromDate, toDate, exchange, freq, False, stock)
        ohlc_data = ohlc_data.append(temp_df)

        conn.hset(hash_key,'ohlc',ohlc_data.to_json())
        
        # 2: Set state to Scanning
        conn.hset(hash_key,'state','SCANNING')
        pass
    
    elif state == 'SCANNING':  # State: Scanning
        temp_df = msg_to_ohlc(data)
        #pdebug(temp_df)
        ohlc_data = pd.read_json(conn.hget(hash_key, 'ohlc'))
        ohlc_data = ohlc_data.append(temp_df)
        conn.hset(hash_key,'ohlc',ohlc_data.to_json())
   
        # 1: Run trading algorithm for entering trade
        tradeDecision = False
        
        # 2: If Algo returns Buy: set State to 'Pending Order: Long'
        if tradeDecision:
            conn.hset(hash_key,'state','PO:LONG')
        
        # 3: If Algo returns Sell: set State to 'Pending Order: Short'
        if tradeDecision:
            conn.hset(hash_key,'state','PO:SHORT')
        
        # 4: Update TradeMetaData: Push order details to OrderQueue
        
        pass
    
    elif state == 'PO:LONG': # State: Pending Order: Long
    
        # 1: On Fill: set State to Long
        conn.hset(hash_key,'state','LONG')
        pass
    
    
    elif state == 'PO:SHORT': # State: Pending Order: Short
    
        # 1: On Fill: set State to Short
        conn.hset(hash_key,'state','SHORT')
        pass
    
    
    elif state == 'LONG': # State: Long
    
        # 1: If notification for AutoSquare Off: set state to init
        
        # 2: Else run trading algorithm for square off
        
        # 3: If algo returns square off: then push square off details to OrderQueue, set state to 'Awaiting Square Off'   
        conn.hset(hash_key,'state','SQUAREOFF')
        pass
    
    
    elif state == 'SHORT': # State: Short
    
        # 1: If notification for AutoSquare Off: set state to init
        
        # 2: Else run trading algorithm for square off
        
        # 3: If algo returns square off: then push square off details to OrderQueue, set state to 'Awaiting Square Off'
    
        conn.hset(hash_key,'state','SQUAREOFF')
        pass
        
    elif state == 'SQUAREOFF':  # State: Awaiting Square Off
        
        conn.hset(hash_key,'state','INIT')
        pass
   
        # 1: On Fill notification: set state to Init
    
    
    trade_lock.release()
 
if False:
    msg_ohlc = {"NSE:WIPRO": {"ohlc": {"date": "2019-1-1", "open": 1896.0, "high": 1910.0, "low": 1885.0, "close": 1902.8, "volume": 1094883.0}}}

    stock='WIPRO'
    state = 'INIT'
    hash_key = stock+'_state'

    try:
        all_keys = list(conn.hgetall(hash_key).keys())
        conn.hdel(hash_key,*all_keys)
    except:
        pass

    conn.hmset(hash_key, {'state':state,'stock':stock, 'qty':0,'price':0,'algo':'','freq':'day','so':0,'target':0})

    logger.setLevel(logging.INFO)
    trade_job(json.dumps(msg_ohlc))
    #trade_job(json.dumps(msg_ohlc))
    #trade_job(json.dumps(msg_ohlc))
    #trade_job(json.dumps(msg_ohlc))
    #trade_job(json.dumps(msg_ohlc))
    #trade_job(json.dumps(msg_ohlc))

if False:
    msg_ohlc = {"NSE:WIPRO": {"ohlc": {"date": "2019-1-3", "open": 1896.0, "high": 1910.0, "low": 1885.0, "close": 1902.8, "volume": 1094883.0}}}
    trade_job(json.dumps(msg_ohlc))

In [3]:
def trade_handler(manager, msg):
    pdebug('trade_handler: {}'.format(msg))
    # Step 1: Blocking call to msgBufferQueue and notificationQueue
    conn.xtrim('msgBufferQueue',maxlen=0, approximate=False)
    conn.xtrim('notificationQueue',maxlen=0, approximate=False)
    while(True):
        msg = conn.xread({'msgBufferQueue':'$','notificationQueue':'$'}, block=0, count=100)
        msgs = conn.xread({'msgBufferQueue':'0','notificationQueue':'0'}, block=1, count=100)
        conn.xtrim('msgBufferQueue',maxlen=0, approximate=False)
        conn.xtrim('notificationQueue',maxlen=0, approximate=False)
        
        # Step 2: Process notifications: Start a worker thread for each notification
        
        #TODO
        
        # Step 3: Process tick: Start a worker thread for each msg
        try:
            for msg in msgs[0][1]:
                for key in json.loads(msg[1]['msg']).keys():
                    stock = key.split(':')[1]
                    
                # Add to OHLCBuffer
                manager.add(stock, trade_job, False, msg[1]['msg'])
                pdebug(msg[0])
        except:
            perror("Un-supported message: {} : {}".format(msg, sys.exc_info()[0]))

In [4]:
#freedom_init = threadManager("freedom_init", ["freedom_init"], [freedom_init])   

#freedom = threadManager("freedom", ["user_requests_handler", "kite_simulator", "backtest_handler", "trade_handler","order_handler"], 
#                        [user_requests_handler, kite_simulator, backtest_handler, trade_handler, order_handler])

logger.setLevel(logging.INFO)
freedom = threadManager("freedom", [ "kite_simulator", "trade_handler", "order_handler"], 
                        [kite_simulator, trade_handler, order_handler])

04-18 15:00:30:INFO:	NumExpr defaulting to 2 threads.
04-18 15:00:30:INFO:	Using cache: Not downloading data
04-18 15:00:30:INFO:	WIPRO: INIT: {'NSE:WIPRO': {'ohlc': {'date': '2019-1-1', 'open': 248.68, 'high': 250.19, 'low': 244.66, 'close': 245.6, 'volume': 1513704.0}}}
04-18 15:00:30:INFO:	Using cache: Not downloading data
04-18 15:00:30:INFO:	WIPRO: SCANNING: {'NSE:WIPRO': {'ohlc': {'date': '2019-1-3', 'open': 245.86, 'high': 246.58, 'low': 242.89, 'close': 244.74, 'volume': 3542281.0}}}
04-18 15:00:30:INFO:	WIPRO: SCANNING: {'NSE:WIPRO': {'ohlc': {'date': '2019-1-4', 'open': 243.98, 'high': 245.75, 'low': 240.34, 'close': 243.95, 'volume': 2405278.0}}}
04-18 15:00:30:INFO:	WIPRO: SCANNING: {'NSE:WIPRO': {'ohlc': {'date': '2019-1-7', 'open': 245.19, 'high': 245.68, 'low': 242.33, 'close': 243.8, 'volume': 1903604.0}}}
04-18 15:00:30:INFO:	WIPRO: SCANNING: {'NSE:WIPRO': {'ohlc': {'date': '2019-1-8', 'open': 244.36, 'high': 246.24, 'low': 242.56, 'close': 244.77, 'volume': 2084659.0}

04-18 15:00:33:INFO:	WIPRO: SCANNING: {'NSE:WIPRO': {'ohlc': {'date': '2019-3-14', 'open': 259.35, 'high': 259.7, 'low': 255.0, 'close': 256.85, 'volume': 4489473.0}}}
04-18 15:00:33:INFO:	WIPRO: SCANNING: {'NSE:WIPRO': {'ohlc': {'date': '2019-3-15', 'open': 258.0, 'high': 264.25, 'low': 257.2, 'close': 263.45, 'volume': 8720097.0}}}
04-18 15:00:33:INFO:	WIPRO: SCANNING: {'NSE:WIPRO': {'ohlc': {'date': '2019-3-18', 'open': 264.75, 'high': 264.75, 'low': 256.05, 'close': 257.6, 'volume': 5748614.0}}}
04-18 15:00:33:INFO:	WIPRO: SCANNING: {'NSE:WIPRO': {'ohlc': {'date': '2019-3-19', 'open': 257.8, 'high': 258.9, 'low': 255.1, 'close': 257.45, 'volume': 5051847.0}}}
04-18 15:00:33:INFO:	WIPRO: SCANNING: {'NSE:WIPRO': {'ohlc': {'date': '2019-3-20', 'open': 258.4, 'high': 263.0, 'low': 258.4, 'close': 261.55, 'volume': 6114562.0}}}
04-18 15:00:33:INFO:	WIPRO: SCANNING: {'NSE:WIPRO': {'ohlc': {'date': '2019-3-22', 'open': 262.45, 'high': 263.75, 'low': 258.5, 'close': 260.55, 'volume': 31980

In [5]:
freq='day'
exchange='NSE'
symbol='WIPRO'
fromDate = '2019-01-01'
toDate = '2019-05-03'
stock=symbol

state = 'INIT'
hash_key = stock+'_state'

try:
    all_keys = list(conn.hgetall(hash_key).keys())
    conn.hdel(hash_key,*all_keys)
except:
    pass

#conn.hmset(hash_key, {'state':state,'stock':stock, 'qty':0,'price':0,'algo':'','freq':'day','so':0,'target':0})

logger.setLevel(logging.INFO)
msg = json.dumps({'stock': stock, 'fromDate':fromDate,'toDate':toDate, 'exchange':exchange, 'freq':freq})
conn.publish('trade_handler','start')
msgid2 = conn.publish('kite_simulator',msg)


In [7]:
stock = 'TCS'
msg = json.dumps({'stock': stock, 'fromDate':fromDate,'toDate':toDate, 'exchange':exchange, 'freq':freq})
conn.publish('trade_handler','start')
msgid2 = conn.publish('kite_simulator',msg)
notification_despatcher(None, 'msg')

In [6]:
conn.pubsub_channels()

['order_handler', 'trade_handler', 'backtest', 'kite_simulator']

In [7]:
conn.publish('kite_simulator','stop')
conn.publish('trade_handler','stop')
freedom.job.terminate()
print(freedom.job.is_alive())
conn.pubsub_channels()

True


['order_handler', 'trade_handler', 'backtest']

In [8]:
notification_despatcher(None, 'msg')
pd.read_json(conn.hget(stock+'_state','ohlc'))

Unnamed: 0,close,high,low,open,volume
2018-09-24,250.15,254.14,248.31,248.31,3740868
2018-09-25,246.84,251.80,245.30,250.38,3389131
2018-09-26,240.04,247.33,237.89,246.77,7864825
2018-09-27,239.81,243.35,238.12,242.03,17997289
2018-09-28,243.61,245.60,237.14,239.85,6912444
...,...,...,...,...,...
2019-04-04,259.05,262.25,254.95,261.00,6419198
2019-01-11,247.14,248.05,243.23,244.81,1598939
2019-04-08,263.70,265.50,260.75,263.70,3637464
2019-04-09,273.80,275.00,263.35,263.50,13991384


## Scratchpad