In [4]:
from tradero import Strategy
from tradero.live import BybitSesh
from tradero.backtesting import Backtest, run_backtests
from tradero.ta import RSI, PIVOT, PivotIndicator, RGXPIVOT, NBS
from tradero.test import BTCUSDT
from tradero.risk import entry_adjustment, sl_by_ratio, tp_by_ratio


In [None]:
from tradero.test import BTCUSDT
from trader.backtesting import Backtest, Strategy
from tradero.risk import entry_adjustment, tp_by_ratio, sl_by_ratio
from tradero.ta import PIVOT, RGXPIVOT, NBS, ZIGZAG, RSI
import re 

def size_by_risk(cash, risk, entry, sl):
    """Calcula el size basado en el riesgo porcential"""
    risk_amount = cash * risk
    sl_dist = abs(entry - sl)
    size = risk_amount / sl_dist
    return size

def sl_adjustment(entry, sl, adjustment=1):
    """
    Ajusta el Stop Loss basado en la distancia desde el entry.
    
    Args:
        entry: Precio de entrada
        sl: Stop Loss original
        adjustment: Factor de ajuste (1 = misma distancia)
        
    Returns:
        Nuevo Stop Loss ajustado
        
    Examples:
        >>> sl_adjustment(100, 90, 0.5)  # COMPRA
        95.0
        >>> sl_adjustment(100, 90, 1.5)  # VENTA
        85.0
    """
    is_buy = entry > sl
    new_sl_dist = abs(entry - sl) * adjustment
    return (entry - new_sl_dist) if is_buy else (entry + new_sl_dist)

def tp_adjustment(entry, tp, adjustment=1):
    """
    Ajusta el Take Profit basado en la distancia desde el entry.
    
    Args:
        entry: Precio de entrada
        tp: Take Profit original
        adjustment: Factor de ajuste (1 = misma distancia)
        
    Returns:
        Nuevo Take Profit ajustado
        
    Examples:
        >>> tp_adjustment(100, 120, 0.5)  # COMPRA
        95.0
        >>> tp_adjustment(100, 120, 1.5)  # VENTA
        130.0
    """
    is_buy = tp > entry
    new_tp_dist = abs(tp - entry) * adjustment
    return (entry + new_tp_dist) if is_buy else (entry - new_tp_dist)


class Neutralizers_A1_v0(Strategy):
    async def init(self):
        self.risk = 0.005
        self.entry_adjust_factor = -0.4 # basado en sl
        self.tp_adjustment = 1.1
        self.sl_adjustment = 1
        self.sl_ratio = 1/3
        # from tradero.core import _CryptoBacktestSesh
        self.rq_pattern = r"L-boo  (H-(neu|shk))  (L-emp  H-(emp|boo)  ){0,10} L-neu$".replace(" ", "") 
        self.entry_pivot = "L-neu"
        self.A, self.B, self.C = None, None, None

    async def on_data(self):
        sesh = self.get_sesh()
        time = await sesh.get_time()

        if time.minute % 5 != 0:
            return
        
        all_data_5m = (await sesh.get_data(symbol='BTCUSDT', timeframe='5m', limit=300))
        data_5m = all_data_5m[:-1]
        unclosed_bar = all_data_5m[-1]

        #print(f"\033[91m{time}\033[0m")

        position = await sesh.get_position_status(symbol='BTCUSDT')
        is_active_position = position['size'] != 0 # ; print(f"  position: {position["size"]:.2f}, active?: {is_active_position}")
        orders = await sesh.get_all_orders(symbol="BTCUSDT") 
        pending_orders = [order["orderStatus"] == "Pending" for order in orders]#; print(f"  pending_orders: {any(pending_orders)}")

        # Calcular Indicadores
        nbs = await self.I(NBS, data=data_5m)
    
        any_pending_order = any(pending_orders)
        if is_active_position:
            return
            #
        elif any_pending_order: # si hay ordenes pendientes
            if self.B :
                if data_5m.High[-1] > self.B:
                    self.A, self.B, self.C = None, None, None
                    await sesh.cancel_all_orders(symbol="BTCUSDT")
                    #
        else: # Buscar entrada  
            current_pattern = "".join(nbs.ptypes) # Patron actual en str
            found_pattern = re.search(self.rq_pattern, current_pattern)
            if found_pattern:

                total_equity = float((await sesh.get_balance())["totalEquity"]) # TODO: arreglar la funcion original para que devuelva un float (si es así en BybitSesh)
                
                basic_pivot_values = PIVOT(data=data_5m).values
                pivot_tag_values = list(zip(nbs.ptypes, basic_pivot_values)) 

                b_index = [val for ptag, val in pivot_tag_values if ptag == "H-neu" or ptag == "H-shk"]

                self.B = [val for ptag, val in pivot_tag_values if ptag == "H-neu" or ptag == "H-shk"][-1] # ultimo valor del neu pivot H
                self.C = [val for ptag, val in pivot_tag_values if ptag == "L-neu"][-1] # ultimo valor del neu pivot L
                
                is_broken_pattern = any(all_data_5m.Low[-10:] < self.C)

                if is_broken_pattern:
                    return 

                # Gestion Base
                entry = self.C
                tp = self.B
                sl = sl_by_ratio(ratio=self.sl_ratio, entry=self.C, tp=self.B)
                # Adjustment
                tp = tp_adjustment(entry, tp, adjustment=self.tp_adjustment)
                sl = sl_adjustment(entry, sl, adjustment=self.sl_adjustment)
                price = entry_adjustment(entry_original=entry, sl=sl, adjust_factor=self.entry_adjust_factor)
                # Calcular el size depues de los ajustes
                size = size_by_risk(risk=self.risk, cash=total_equity, entry=price, sl=sl)
            
                # #### ORDEN ###
                await sesh.buy(symbol="BTCUSDT", price=price, tp_price=tp, sl_price=sl, size=size)

    async def on_exit(self):
        sesh = self.sesh
        data_15m = await sesh.get_data(symbol="BTCUSDT", timeframe="15m")
        data_5m = await sesh.get_data(symbol="BTCUSDT", timeframe="5m")
        await self.I(ZIGZAG, data=data_5m)
        await self.I(RSI, data=data_15m)
   
class Neutralizers_3P(Strategy):
    async def init(self):
        self.risk = 0.005
        self.entry_adjust_factor = -0.4 # basado en sl
        self.tp_adjustment = 1.1
        self.sl_adjustment = 1
        self.sl_ratio = 1/3
        # from tradero.core import _CryptoBacktestSesh
        # self.rq_pattern = r"(<L-(?:emp|boo|shk)[^>]*>)  (<H-(?:emp|neu)[^>]*>)  (<L-boo[^>]*>)  (<H-(?:neu|shk)[^>]*>)  (?:<L-emp[^>]*><H-(?:emp|boo)[^>]*>){0,10}  (<L-neu[^>]*>)  $  ".replace(" ", "") 
        self.A, self.B, self.C, self.D, self.E = None, None, None, None, None
        self.rq_pattern = re.compile(r"(<L-(?:emp|boo|shk)[^>]*>)  (<H-(?:emp|neu)[^>]*>) (<L-boo[^>]*>) (<H-(?:neu|shk)[^>]*>) (?:<L-emp[^>]*><H-(?:emp|boo)[^>]*>){0,10} (<L-neu[^>]*>) $".replace(" ", "") )
        # self.rq_pattern = re.compile(r" (<L-boo[^>]*>) (<H-(?:neu|shk)[^>]*>) (?:<L-emp[^>]*><H-(?:emp|boo)[^>]*>){0,10} (<L-neu[^>]*>) $".replace(" ", "") )

    async def on_data(self):
        sesh = self.get_sesh()
        time = await sesh.get_time()

        if time.minute % 5 != 4:
            return
        
        # if time.minute % 5 != 0:
        #     return
        

        all_data_5m = (await sesh.get_data(symbol='BTCUSDT', timeframe='5min', limit=300))
        data_5m = all_data_5m[:-1]
        unclosed_bar = all_data_5m[-1]

        position = await sesh.get_position_status(symbol='BTCUSDT')
        is_active_position = position['size'] != 0 # ; print(f"  position: {position["size"]:.2f}, active?: {is_active_position}")
        orders = await sesh.get_all_orders(symbol="BTCUSDT") 
        pending_orders = [order["orderStatus"] == "Pending" for order in orders]#; print(f"  pending_orders: {any(pending_orders)}")
        # Calcular Indicadores
        # print(RGX_PIV.search(self.rq_pattern))
        any_pending_order = any(pending_orders)
        if is_active_position:
            return
            #
        elif any_pending_order: # si hay ordenes pendientes
            if self.D:
                if data_5m.High[-1] > self.D:
                    self.A, self.B, self.C, self.D, self.E = None, None, None, None, None
                    await sesh.cancel_all_orders(symbol="BTCUSDT")
                    #
        else: # Buscar entrada  
            RGX_PIV = await self.I(RGXPIVOT, data=data_5m, nbs=True)
            found_pattern = RGX_PIV.search(self.rq_pattern)
            if found_pattern:
                RGX_PIV.set_tag(self.rq_pattern, ('a','b','c','d','e')) 

                total_equity = float((await sesh.get_balance())["totalEquity"]) # TODO: arreglar la funcion original para que devuelva un float (si es así en BybitSesh)
                
                pivot_values = RGX_PIV.values
                # pivot_tag_values = list(zip(nbs.ptypes, basic_pivot_values)) 
                tags = RGX_PIV.tags
                self.A = pivot_values[tags == 'a'][0]
                self.B = pivot_values[tags == 'b'][0]
                self.C = pivot_values[tags == 'c'][0]
                self.D = pivot_values[tags == 'd'][0]
                self.E = pivot_values[tags == 'e'][0]

                is_broken_pattern = any(all_data_5m.Low[-10:] < self.E)
                if is_broken_pattern:
                    return 

                # Gestion Base
                de_diff, ae_diff = abs(self.D - self.E), abs(self.A - self.E)
                is_close_to_e = (ae_diff < de_diff / 2) and (self.A < self.E)

                entry = self.A if is_close_to_e else self.E
                tp = self.D
                sl = sl_by_ratio(ratio=self.sl_ratio, entry=entry, tp=tp)
                # Adjustment
                tp = tp_adjustment(entry, tp, adjustment=self.tp_adjustment)
                sl = sl_adjustment(entry, sl, adjustment=self.sl_adjustment)
                price = entry_adjustment(entry_original=entry, sl=sl, adjust_factor=self.entry_adjust_factor)
                # Calcular el size depues de los ajustes
                size = size_by_risk(risk=self.risk, cash=total_equity, entry=price, sl=sl)
            
                # #### ORDEN ###
                await sesh.buy(symbol="BTCUSDT", price=price, tp_price=tp, sl_price=sl, size=size)

    async def on_exit(self):
        sesh = self.sesh
        data_15m = await sesh.get_data(symbol="BTCUSDT", timeframe="15min")
        data_5m = await sesh.get_data(symbol="BTCUSDT", timeframe="5min")
        await self.I(ZIGZAG, data=data_5m)
        await self.I(NBS, data=data_5m)
        # await self.I(RSI, data=data_15m.Close)
   
class test_strategy(Strategy):
    async def init(self):
        pass
    async def on_data(self):
        sesh = self.get_sesh()
        time = await sesh.get_time()
        # print(time)
        if time.minute % 5 != 4:
            return
        
        # data_1min = (await sesh.get_data(symbol='BTCUSDT', timeframe='1min', limit=300)).df
        # print(f" \033[0m{data_1min.index[-1]}  o:{data_1min.Open.iloc[-1]:.0f}  h:{data_1min.High.iloc[-1]:.0f}  l:{data_1min.Low.iloc[-1]:.0f}  c:{data_1min.Close.iloc[-1]:.0f}  v:{data_1min.Volume.iloc[-1]:.0f}\033[0m")
        # all_data_5m = (await sesh.get_data(symbol='BTCUSDT', timeframe='5min', limit=300))
        # all_data_1m = (await sesh.get_data(symbol='BTCUSDT', timeframe='1min', limit=300))
        # data_5min = (await sesh.get_data(symbol='BTCUSDT', timeframe='5min', limit=300)).df
        # print(f" \033[91m{data_5min.index[-1]}  o:{data_5min.Open.iloc[-1]:.0f}  h:{data_5min.High.iloc[-1]:.0f}  l:{data_5min.Low.iloc[-1]:.0f}  c:{data_5min.Close.iloc[-1]:.0f}  v:{data_5min.Volume.iloc[-1]:.0f}\033[0m")
        # print(all_data_5m.Close[-1], all_data_5m.index[-1])
        # df_5m = all_data_5m.df
        # print(f"{df_5m.index[-1]}  o:{df_5m.Open.iloc[-1]:.0f}  h:{df_5m.High.iloc[-1]:.0f}  l:{df_5m.Low.iloc[-1]:.0f}  c:{df_5m.Close.iloc[-1]:.0f}  v:{df_5m.Volume.iloc[-1]:.0f}")
        # print(f"{df_5m.index[-2]}  o:{df_5m.Open.iloc[-2]:.0f}  h:{df_5m.High.iloc[-2]:.0f}  l:{df_5m.Low.iloc[-2]:.0f}  c:{df_5m.Close.iloc[-2]:.0f}  v:{df_5m.Volume.iloc[-2]:.0f}")
        # print(f"5 min {time}   {df_5m.index[-1]}  o:{df_5m.Open.iloc[-1]:.0f}  h:{df_5m.High.iloc[-1]:.0f}  l:{df_5m.Low.iloc[-1]:.0f}  c:{df_5m.Close.iloc[-1]:.0f}  v:{df_5m.Volume.iloc[-1]:.0f}")

        # print(f"index: {all_data_5m.index[-1].astype("datetime64[m]")} :  o: {all_data_5m.Open[-1]:.0f},  h: {all_data_5m.High[-1]:.0f}, l: {all_data_5m.Low[-1]:.0f}, c: {all_data_5m.Close[-1]:.0f}, v: {all_data_5m.Volume[-1]:.0f}")
    async def on_exit(self):
        all_data_5m = (await self.sesh.get_data(symbol='BTCUSDT', timeframe='5min', limit=300))
        # print(f"{all_data_5m.df[["Open", "High", "Low", "Close", "Volume"]].iloc[:].round(0)}")
    
""" Neutralizers 5m """
# data = BTCUSDT[2450:2700].resample("5m")#[1600:2000]
# klines = BTCUSDT[:8000].resample("5m")#[1600:2000]
packet = {"BTCUSDT": BTCUSDT[:].resample("1min")}#[1600:2000]
bt = Backtest(strategy=Neutralizers_3P, packet=packet, margin=1/20, cash=100_000, warmup=50)
stats = await bt.run(mae_metric_type="ROE", p_bar=True)
bt.plot( 
    plot_equity=False,
    plot_return=True,
    plot_trades=True,
    relative_equity=True,
    plot_volume=True,
    # width=1000,
    # height=500,
    timeframe="5min",
)  # 19:00:00 c: 104489 v 215