In [203]:
from binance.client import Client
import pandas as pd
import plotly.graph_objects as go
from pon import api,secret
import numpy as np
import schedule
import time
from plotly.subplots import make_subplots
import pandas_ta as ta
import warnings
warnings.filterwarnings("ignore")


In [204]:
def get_price_data_binance(ticker:str,limit:int,interval = Client.KLINE_INTERVAL_1DAY)->pd.DataFrame:
    client = Client(api, secret)
    df = pd.DataFrame(client.get_historical_klines(symbol=ticker, interval=interval,start_str='20 Aug, 2021', end_str='20 Jan, 2022'))
    df.columns=['date','open','high','low','close','volume','close_time','d1','d2','d3','d4','d5']
    df = df.drop(['close_time','d1','d2','d3','d4','d5'],axis=1)
    df['date'] = pd.to_datetime(df['date']*1000000)
    df['open'] = df['open'].astype(float)
    df['high'] = df['high'].astype(float)
    df['low'] = df['low'].astype(float)
    df['close'] = df['close'].astype(float)
    return df
    
def get_list_of_up_imbalances(df:pd.DataFrame)->list:
    up_imbalances = []
    for i in range(2, len(df)):
        prev_candle = df.iloc[i-2]
        next_candle = df.iloc[i]
        if prev_candle['high'] < next_candle['low']:
            up_imbalances.append([round((prev_candle['high']+next_candle['low'])/2,2), prev_candle['high'],df.iloc[i-1]['date']])
    return up_imbalances

def get_list_of_down_imbalances(df:pd.DataFrame)->list:
    down_imbalances = []
    for i in range(2, len(df)):
        prev_candle = df.iloc[i-2]
        next_candle = df.iloc[i]
        if prev_candle['low'] > next_candle['high']:
            down_imbalances.append([round((prev_candle['low']+next_candle['high'])/2,2) , prev_candle['low'] , df.iloc[i-1]['date']])
    return down_imbalances

def delete_up_imbalance(up_imbalances, df:pd.DataFrame)->list:
    for i in range(0, len(df)-10):
        for imbalance in up_imbalances:
            candle = df.iloc[i]
            if candle['date']>imbalance[2]  and (candle['close'] < imbalance[0] or candle['low'] < imbalance[0]):
                up_imbalances.remove(imbalance)
    return up_imbalances[-3:]

def delete_down_imbalance(down_imbalances, df:pd.DataFrame)->list:
    for i in range(0, len(df)-10):
        for imbalance in down_imbalances:
            candle = df.iloc[i]
            if candle['date']>imbalance[2] and (candle['close'] > imbalance[0] or candle['high'] > imbalance[0]):
                down_imbalances.remove(imbalance)
    
    return down_imbalances[-3:]

def add_moving_average(period:int,df:pd.DataFrame)->pd.DataFrame:
    df['SMA'] = df['close'].rolling(period).mean()
    df['SMAslope'] = df['SMA'].pct_change()*100

    df['up_reversal'] = ((df['SMAslope'].shift(1) < 0) & (df['SMAslope'] > 0))
    df['down_reversal'] = ((df['SMAslope'].shift(1) > 0) & (df['SMAslope'] < 0)) 
    # df['charthighest'] = df['high'].rolling(6).max()[df['down_reversal'] == True]
    # df['chartlowest'] = df['low'].rolling(6).min()[df['up_reversal'] == True]
    # up_imbalances = get_list_of_up_imbalances(df)
    # down_imbalances = get_list_of_down_imbalances(df)

    # df['up_reversal'] = False
    # df['down_reversal'] = False
    # for i in range(5, len(df)-3):
    #     for imbalance in up_imbalances:
    #         if (df.iloc[i-1]['date'] < imbalance[2] < df.iloc[i+3]['date']):
    #             if (df.iloc[i-1]['SMAslope'] < 0) & (df.iloc[i]['SMAslope'] > 0):
    #                 if not (df.iloc[i-1]['up_reversal'] or df.iloc[i-2]['up_reversal'] or df.iloc[i-3]['up_reversal'] or df.iloc[i-4]['up_reversal']):
    #                     df.loc[i, 'up_reversal'] = True
    #     for imbalance in down_imbalances:
    #         if (df.iloc[i-1]['date'] < imbalance[2] < df.iloc[i+3]['date']):
    #             if ((df.iloc[i-1]['SMAslope'] > 0) & (df.iloc[i]['SMAslope'] < 0)):
    #                 if not (df.iloc[i-1]['down_reversal'] or df.iloc[i-2]['down_reversal'] or df.iloc[i-3]['down_reversal'] or df.iloc[i-4]['down_reversal']):
    #                     df.loc[i, 'down_reversal'] = True


    # df['up_reversal'] = ((df['SMAslope'].shift(1) < 0) & (df['SMAslope'] > 0))
    # df['down_reversal'] = ((df['SMAslope'].shift(1) > 0) & (df['SMAslope'] < 0))

    return df

def add_rsx_HF(period:int,df:pd.DataFrame)->pd.DataFrame:
    df["RSX"] = ta.rsx(round((df['close']+df['high']+df['low'])/3,2), period)
    df['RSXslope'] = df['RSX'].pct_change()*100
    df['RSX_up_reversal'] = ((df['RSXslope'].shift(1) < 0) & (df['RSXslope'] > 0)) & (df['RSX'] < 30)
    df['RSX_down_reversal'] = ((df['RSXslope'].shift(1) > 0) & (df['RSXslope'] < 0)) & (df['RSX'] > 70)

    up_reversals = []
    down_reversals = []
    # df['hidden_up_divergence'] = False
    # df['hidden_down_divergence'] = False
    df['classic_up_divergence'] = False
    df['classic_down_divergence'] = False

    for i in range(0, len(df)):
        if df.iloc[i]['RSX_up_reversal']:
            up_reversals.append(min((df.iloc[i]['low'],df.iloc[i-1]['RSX'],df.iloc[i]['date']),
                (df.iloc[i-1]['low'],df.iloc[i-1]['RSX'],df.iloc[i-1]['date']),
                (df.iloc[i-2]['low'],df.iloc[i-1]['RSX'],df.iloc[i-2]['date']),
                (df.iloc[i-3]['low'],df.iloc[i-1]['RSX'],df.iloc[i-3]['date']),
                (df.iloc[i-4]['low'],df.iloc[i-1]['RSX'],df.iloc[i-4]['date']),
                (df.iloc[i-5]['low'],df.iloc[i-1]['RSX'],df.iloc[i-5]['date'])))

        if df.iloc[i]['RSX_down_reversal']:
            down_reversals.append(max((df.iloc[i]['high'],df.iloc[i-1]['RSX'],df.iloc[i]['date']),
                (df.iloc[i-1]['high'],df.iloc[i-1]['RSX'],df.iloc[i-1]['date']),
                (df.iloc[i-2]['high'],df.iloc[i-1]['RSX'],df.iloc[i-2]['date']),
                (df.iloc[i-3]['high'],df.iloc[i-1]['RSX'],df.iloc[i-3]['date']),
                (df.iloc[i-4]['high'],df.iloc[i-1]['RSX'],df.iloc[i-4]['date']),
                (df.iloc[i-5]['high'],df.iloc[i-1]['RSX'],df.iloc[i-5]['date'])))

        if down_reversals:
            if df.iloc[i]['date']>down_reversals[-1][2]:
                if df.iloc[i]['high'] > down_reversals[-1][0]:
                    if df.iloc[i]['RSX'] < down_reversals[-1][1]:
                        print(df.iloc[i]['high'])

        # if (up_reversals and down_reversals):
        #     for up_reversal in up_reversals[-5:]:
        #         if (up_reversal[0] < df.iloc[i]['low']) and (up_reversal[1] > df.iloc[i]['RSX']) and up_reversal[2] < df.iloc[i]['date']:
        #             df.loc[i, 'hidden_up_divergence'] = True
                
        #     for down_reversal in down_reversals[-5:]:
        #         if (down_reversal[0] > df.iloc[i]['high']) and (down_reversal[1] < df.iloc[i]['RSX']) and down_reversal[2] < df.iloc[i]['date']:
        #             df.loc[i, 'hidden_down_divergence'] = True
    # for i in range(0, len(df)):
    #     if df.iloc[i]['date']>down_reversals[2][2]:
    #         if df.iloc[i]['high'] > down_reversals[2][0]:
    #             if df.iloc[i]['RSX'] < down_reversals[2][1]:
    #                 print(df.iloc[i]['high'])
    print(up_reversals)
    print(down_reversals)
    return df

def add_rsx_LF(period:int,df:pd.DataFrame)->pd.DataFrame:
    df["RSX"] = ta.rsx(round((df['close']+df['high']+df['low'])/3,2), period)
    df['RSXslope'] = df['RSX'].pct_change()*100
    df['RSX_up_reversal'] = ((df['RSXslope'].shift(1) < 0) & (df['RSXslope'] > 0)) & (df['RSXslope'].shift(1).rolling(5).sum()<-6)
    df['RSX_down_reversal'] = ((df['RSXslope'].shift(1) > 0) & (df['RSXslope'] < 0)) & (df['RSXslope'].shift(1).rolling(5).sum()>6)

    up_reversals = []
    down_reversals = []
    df['classic_up_divergence'] = False
    df['classic_down_divergence'] = False

    for i in range(len(df) - 200, len(df)):
        if df.iloc[i]['RSX_up_reversal']:
            up_reversals.append(min((df.iloc[i]['low'],df.iloc[i-1]['RSX'],df.iloc[i]['date']),
                (df.iloc[i-1]['low'],df.iloc[i-1]['RSX'],df.iloc[i-1]['date']),
                (df.iloc[i-2]['low'],df.iloc[i-1]['RSX'],df.iloc[i-2]['date']),
                (df.iloc[i-3]['low'],df.iloc[i-1]['RSX'],df.iloc[i-3]['date']),
                (df.iloc[i-4]['low'],df.iloc[i-1]['RSX'],df.iloc[i-4]['date']),
                (df.iloc[i-5]['low'],df.iloc[i-1]['RSX'],df.iloc[i-5]['date'])))

        if df.iloc[i]['RSX_down_reversal']:
            down_reversals.append(max((df.iloc[i]['high'],df.iloc[i-1]['RSX'],df.iloc[i]['date']),
                (df.iloc[i-1]['high'],df.iloc[i-1]['RSX'],df.iloc[i-1]['date']),
                (df.iloc[i-2]['high'],df.iloc[i-1]['RSX'],df.iloc[i-2]['date']),
                (df.iloc[i-3]['high'],df.iloc[i-1]['RSX'],df.iloc[i-3]['date']),
                (df.iloc[i-4]['high'],df.iloc[i-1]['RSX'],df.iloc[i-4]['date']),
                (df.iloc[i-5]['high'],df.iloc[i-1]['RSX'],df.iloc[i-5]['date'])))
                
        if (up_reversals and down_reversals):
            for up_reversal in up_reversals[-1:]:    
                if (up_reversal[0] > df.iloc[i]['low']) and (up_reversal[1] < df.iloc[i]['RSX']) and up_reversal[2] < df.iloc[i]['date']:
                    df.loc[i, 'classic_up_divergence'] = True

            for down_reversal in down_reversals[-1:]:       
                if (down_reversal[0] < df.iloc[i]['high']) and (down_reversal[1] > df.iloc[i]['RSX']) and down_reversal[2] < df.iloc[i]['date']:
                    df.loc[i, 'classic_down_divergence'] = True
                
    return df

def backtest(data: pd.DataFrame, buy_orders: list, sell_orders: list,depo:int,risk:int):
    # Initialize variables to store statistics
    buy_profit = 0
    buy_loss = 0
    sell_profit = 0
    sell_loss = 0
    
    total_trades = 0
    
    win_buy_trades = 0
    lose_buy_trades = 0

    win_sell_trades = 0
    lose_sell_trades = 0
    lose = depo*0.01*risk
    
    # Iterate through buy orders
    for order in buy_orders:
        open_price = order[0]
        stop_loss = order[1]
        take_profit = order[2]
        open_date = order[3]
        total_trades += 1

        # Iterate through data starting from the open date
        for i in range(1, len(data)):
            if data.iloc[i]['date'] > open_date:
                if data.iloc[i]['high'] >= take_profit:
                    depo += round((take_profit - open_price)/(open_price - stop_loss),2)*lose-(depo*0.01*risk*0.04)
                    win_buy_trades += 1
                    break
                elif data.iloc[i]['low'] <= stop_loss:
                    depo -= 1*lose+(depo*0.01*0.04)
                    lose_buy_trades += 1
                    break
    
    # Iterate through sell orders
    for order in sell_orders:
        open_price = order[0]
        stop_loss = order[1]
        take_profit = order[2]
        open_date = order[3]
        total_trades += 1
        
        # Iterate through data starting from the open date
        for i in range(1, len(data)):
            if data.iloc[i]['date'] > open_date:
                if data.iloc[i]['low'] <= take_profit:
                    depo += round((open_price - take_profit)/(stop_loss - open_price),2)*lose-(depo*0.01*risk*0.04)
                    win_sell_trades += 1
                    break
                elif data.iloc[i]['high'] >= stop_loss:
                    depo -= 1*lose+(depo*0.01*0.04)
                    lose_sell_trades += 1
                    break
    
    print(f"""
        Total trades: {total_trades},
        Total win trades: {win_buy_trades + win_sell_trades}
        Total lose trades: {lose_buy_trades + lose_sell_trades}
        Winrate: {round((win_buy_trades+win_sell_trades)/total_trades*100,2)}%
        Winrate buy trades: {round((win_buy_trades)/len(buy_orders)*100,2)}%
        Winrate sell trades: {round((win_sell_trades)/len(sell_orders)*100,2)}%
        Deposit: {depo}"""
    )


In [205]:
df_higher_tf = get_price_data_binance('BTCUSDT',1000,interval=Client.KLINE_INTERVAL_1DAY)
# df_lower_tf = get_price_data_binance('BTCUSDT',1000,interval=Client.KLINE_INTERVAL_5MINUTE)

In [206]:
# df_lower_tf = add_rsx_LF(21,df_lower_tf)
# df_lower_tf = add_moving_average(3,df_lower_tf)

df_higher_tf = add_rsx_HF(7,df_higher_tf)

fig = go.Figure(data=[go.Candlestick(x=df_higher_tf['date'],
                open=df_higher_tf['open'],
                high=df_higher_tf['high'],
                low=df_higher_tf['low'],
                close=df_higher_tf['close'])])

# fig1 = go.Figure(data=[go.Candlestick(x=df_lower_tf['date'],
#                 open=df_lower_tf['open'],
#                 high=df_lower_tf['high'],
#                 low=df_lower_tf['low'],
#                 close=df_lower_tf['close'])])

# buy_orders = []
# sell_orders = []
# multiplier = 3
# for i in range(3, len(df_lower_tf)):
#     if df_lower_tf.iloc[i-3]['low'] > df_lower_tf.iloc[i-2]['low'] and df_lower_tf.iloc[i-1]['close'] > df_lower_tf.iloc[i-3]['open'] and df_lower_tf.iloc[i-3]['open'] > df_lower_tf.iloc[i-3]['close']:
#         if df_higher_tf.iloc[i//3]['hidden_up_divergence']:
#             buy_orders.append((df_lower_tf.iloc[i]['open'], df_lower_tf.iloc[i-2]['low'],
#             round(df_lower_tf.iloc[i]['open'] + (df_lower_tf.iloc[i]['open']-df_lower_tf.iloc[i-2]['low'])*multiplier,2),
#             df_lower_tf.iloc[i]['date']))

#     if df_lower_tf.iloc[i-3]['high'] < df_lower_tf.iloc[i-2]['high'] and df_lower_tf.iloc[i-1]['close'] < df_lower_tf.iloc[i-3]['open'] and df_lower_tf.iloc[i-3]['open'] < df_lower_tf.iloc[i-3]['close']:
#         if df_higher_tf.iloc[i//3]['hidden_down_divergence']:
#             sell_orders.append((df_lower_tf.iloc[i]['open'], df_lower_tf.iloc[i-2]['high'],
#             round(df_lower_tf.iloc[i]['open'] - (df_lower_tf.iloc[i-2]['high']-df_lower_tf.iloc[i]['open'])*multiplier,2),
#             df_lower_tf.iloc[i]['date']))

# backtest(df_lower_tf,buy_orders,sell_orders,100,1)

# for order in buy_orders:
#     fig1.add_trace(go.Scatter(x=[order[3]],y=[order[0]],mode='markers', marker=dict(size=4, color='green')))
#     fig1.add_trace(go.Scatter(x=[order[3]],y=[order[1]],mode='markers', marker=dict(size=4, color='green')))
#     fig1.add_trace(go.Scatter(x=[order[3]],y=[order[2]],mode='markers', marker=dict(size=4, color='green')))

# for order in sell_orders:
#     fig1.add_trace(go.Scatter(x=[order[3]],y=[order[0]],mode='markers', marker=dict(size=4, color='red')))
#     fig1.add_trace(go.Scatter(x=[order[3]],y=[order[1]],mode='markers', marker=dict(size=4, color='red')))
#     fig1.add_trace(go.Scatter(x=[order[3]],y=[order[2]],mode='markers', marker=dict(size=4, color='red')))

rsx = go.Figure(go.Scatter(
        x=df_higher_tf['date'],
        y=df_higher_tf['RSX']
    ))

for i in range(len(df_higher_tf)):
    if df_higher_tf.iloc[i]['RSX_up_reversal']:
        rsx.add_vline(x=df_higher_tf.iloc[i]['date'],line=dict(color='green'))
    if df_higher_tf.iloc[i]['RSX_down_reversal']:
        rsx.add_vline(x=df_higher_tf.iloc[i]['date'],line=dict(color='red'))



fig.update_layout(showlegend=False)
fig.show()
# fig1.update_layout(showlegend=False)
# fig1.show()
rsx.update_layout(showlegend=False)
rsx.show()

64486.0
67789.0
68524.25
52088.0
[(39600.0, 22.017292221022934, Timestamp('2021-09-21 00:00:00')), (55600.0, 7.041857012400888, Timestamp('2021-11-19 00:00:00')), (42000.3, 15.531198423547021, Timestamp('2021-12-04 00:00:00')), (46751.0, 26.791420251232363, Timestamp('2021-12-11 00:00:00')), (45678.0, 20.24948862631217, Timestamp('2021-12-31 00:00:00')), (40501.0, 1.2578871593506402, Timestamp('2022-01-08 00:00:00'))]
[(48843.2, 72.96660965595038, Timestamp('2021-09-18 00:00:00')), (62933.0, 91.3887123028639, Timestamp('2021-10-15 00:00:00')), (67000.0, 92.74345584327428, Timestamp('2021-10-20 00:00:00')), (69000.0, 83.52771468941297, Timestamp('2021-11-10 00:00:00')), (51810.0, 85.61846242931406, Timestamp('2021-12-24 00:00:00'))]


In [207]:
df_higher_tf.loc[df_higher_tf['classic_down_divergence']]

Unnamed: 0,date,open,high,low,close,volume,RSX,RSXslope,RSX_up_reversal,RSX_down_reversal,classic_up_divergence,classic_down_divergence
