In [1]:
import base as b
import pandas as pd
import datetime as dt
from typing import Dict
import talib as ta
import utils as ut
from constants import *
from logger_settings import logger

settings = {
    "small_sma": 3,
    "big_sma": 10,
}

instrument = b.Instrument(name="NIFTY 22650 CALL 7 Mar 2024")

SYMBOL = "NIFTY"
expiry = dt.datetime.strptime("2024-03-21", "%Y-%m-%d").date()
otype = "CE"
strike = 22000
date = dt.datetime.strptime("2024-03-15", "%Y-%m-%d").date()

tdf = ut.get_ticks(symbol=SYMBOL, expiry=expiry, strike=strike, otype=otype, date=date)

In [7]:
class Phase(b.BasePhase):
    def __init__(self, initiated_at, strategy):
        super().__init__(initiated_at, strategy)

    def has_breached_start(self):
        up_breach = self.direction == UP and (self.pm.ticks.iloc[-1].last_price <= self.pm.ticks.loc[self.initiated_at].last_price)
        if up_breach:
            return up_breach
        down_breach = self.direction == DOWN and (self.pm.ticks.iloc[-1].last_price >= self.pm.ticks.loc[self.initiated_at].last_price)
        return down_breach

    def get_new_phase_start(self):
        """
        Return new phase start when current phase is breached
        """
        if self.direction == UP:
            return self.pm.ticks.loc[self.pm.ticks.index > self.start].idxmin()
        else:
            return self.pm.ticks.loc[self.pm.ticks.index > self.start].idxmax()

    def next(self, tick):
        if self.status == self.STATUS_TERM:
            raise Exception(f"phase {self} already terminated")
        if self.status == self.STATUS_INITIATED:
            if self.strategy.ticks.iloc[-1].ch > 0:
                self.direction = UP
            else:
                self.direction = DOWN
            return
        if self.has_breached_start():
            self.strategy.on_termination()
            return


class PhaseStrategy(b.BasePhaseStrategy):
    def __init__(self, instrument: "Instrument", settings: Dict):
        super().__init__(instrument, settings)
        self.om = ut.ZerodhaOrderManager()

    def calculate_data(self):
        self.ticks["ch"] = self.ticks.last_price - self.ticks.last_price.shift(1)
        self.ticks["vwch"] = self.ticks.ch * self.ticks.volume  # Volume weighted change
        
    def next(self, tick: Dict):
        super().next(tick)
        self.calculate_data()
        if self.current_phase is None:
            self.current_phase = Phase(initiated_at=self.ticks.iloc[-1].name, strategy=self)
        else:
            self.current_phase.next(tick=tick)

    def on_termination(self):
        logger.info(f"phase {self} terminated")
        if self.om.has_intrade_orders():
            self.om.square_off_all_orders()
        self.inactive_ph.append(self.current_phase)
        new_start = self.get_new_phase_start()
        self.current_phase = Phase(initiated_at=new_start, strategy=self)


pm = PhaseStrategy(instrument=instrument, settings=settings)
# for i in range(tdf.shape[0]):
for i in range(10):
    pm.next(tdf.iloc[i].to_dict())

print(pm.current_phase)

Phase:DOWN, INITIATED I:0, C:None R:None, T:None, RJ:None
