In [2]:
from pybit.unified_trading import WebSocket
from strat_globals import pretty
from time import sleep
import pandas as pd
import numpy as np
from pybit.unified_trading import HTTP
from quantfreedom.indicators.talib_ind import from_talib
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from config_mine import api_key_test, api_secret_test
from quantfreedom._typing import pdFrame

In [3]:
session_real = HTTP()
session_test = HTTP(
    testnet=True,
    api_key=api_key_test,
    api_secret=api_secret_test,
)
symbol = "BTCUSDT"
market_fee_pct = float(
    session_test.get_fee_rates(
        symbol=symbol
    )[
        "result"
    ]["list"][
        0
    ]["takerFeeRate"]
)
limit_fee_pct = float(
    session_test.get_fee_rates(
        symbol=symbol,
    )[
        "result"
    ]["list"][
        0
    ]["makerFeeRate"]
)
mmr_pct = float(
    session_test.get_risk_limit(
        category="linear",
        symbol="BTCUSDT",
    )[
        "result"
    ]["list"][
        0
    ]["maintenanceMargin"]
)
dropped_below_range = False
trade_counter = 0
max_equity_risk_pct = 6.0 / 100
size_pct = 1.0 / 100
add_pct_to_sl = 0.25 / 100

In [4]:
def dl_data():
    data = np.asarray(
        session_real.get_kline(
            category="inverse",
            symbol=symbol,
            interval=5,
            # end=1681580100000,
        )[
            "result"
        ]["list"],
        dtype=np.float_,
    )[:, :-2]
    data_columns = pd.MultiIndex.from_tuples(
        [
            (symbol, "open"),
            (symbol, "high"),
            (symbol, "low"),
            (symbol, "close"),
        ],
        name=["symbol", "candle_info"],
    )
    data_index = pd.Index(
        data=pd.to_datetime(data[:, 0].flatten(), unit="ms"),
        name="open_time",
    )
    price_data = pd.DataFrame(
        data[:, 1:],
        columns=data_columns,
        index=data_index,
    )
    price_data.sort_index(ascending=True, inplace=True)
    price_data.drop(price_data.tail(1).index, inplace=True)

    rsi_data = from_talib(
        func_name='rsi',
        price_data=price_data,
        timeperiod=15,
    )
    return price_data, rsi_data

In [5]:
def entry_checker(
    rsi_data: pdFrame,
    price_data: pdFrame,
    price_range_start: float,
    price_range_end: float,
    lookback: int,
    pivot_low_lookback: int,
    plot_results: bool = False
):
    global dropped_below_range
    
    rsi_values = rsi_data.values.flatten()
    price_open_values = price_data.values[:, 0]
    price_high_values = price_data.values[:, 1]
    price_low_values = price_data.values[:, 2]
    price_close_values = price_data.values[:, 3]

    divergence_index = np.full(2, -1, dtype=np.int_)
    eval_line_graph = np.full(2, np.nan)
    candle_line_graph = np.full(2, np.nan)

    if pivot_low_lookback is None:
        pivot_low = price_low_values.min()
    else:
        pivot_low = price_low_values[-pivot_low_lookback:].min()

    temp_candle_low = price_low_values[-1]

    entry = False
    if (
        not dropped_below_range
        and temp_candle_low < price_range_start
        and temp_candle_low > price_range_end
        and temp_candle_low < pivot_low
        and price_close_values[-1] < price_open_values[-1]
    ):
        pivot_low = temp_candle_low
        for lb in range(197, 197 - lookback, -1):
            if (
                rsi_values[-1] > rsi_values[lb]
                and price_close_values[lb] < price_open_values[lb]
            ):
                entry = True

                # plotly graphing stuff
                candle_line_graph[0] = price_low_values[lb] - (
                    price_low_values[lb] * 0.0002
                )
                candle_line_graph[1] = temp_candle_low - (temp_candle_low * 0.0002)

                eval_line_graph[0] = rsi_values[lb] - (rsi_values[lb] * 0.05)
                eval_line_graph[1] = rsi_values[-1] - (rsi_values[-1] * 0.05)

                divergence_index[0] = lb

                break
    elif price_high_values[-1] < price_range_end:
        # we have gone out of the range so turn off looking for a trade in the range
        dropped_below_range = True

    if plot_results:
        pivot_low_graph = np.where(
            price_low_values == pivot_low, pivot_low - (pivot_low * 0.0002), np.nan
        )
        plot_index = price_data.index
        divergence_index = plot_index[divergence_index.tolist()]

        fig = make_subplots(
            rows=2,
            cols=1,
            row_heights=[0.7, 0.3],
            shared_xaxes=True,
            vertical_spacing=0.02,
        )
        fig.add_candlestick(
            x=plot_index,
            open=price_data.BTCUSDT.open.values.flatten(),
            high=price_data.BTCUSDT.high.values.flatten(),
            low=price_data.BTCUSDT.low.values.flatten(),
            close=price_data.BTCUSDT.close.values.flatten(),
            row=1,
            col=1,
            name="Candles",
        )
        fig.add_scatter(
            x=plot_index,
            y=pivot_low_graph,
            mode="markers",
            marker=dict(size=10, color="MediumPurple"),
            name=f"Lowest Candle {pivot_low_lookback} lb",
            row=1,
            col=1,
        )
        fig.add_scatter(
            x=divergence_index,
            y=candle_line_graph,
            mode="markers+lines",
            marker=dict(size=10, symbol="arrow-up"),
            name="Candle Line Div",
            row=1,
            col=1,
            showlegend=False,
        )

        fig.add_scatter(
            x=plot_index,
            y=rsi_data.values.flatten(),
            name="RSI",
            row=2,
            col=1,
        )

        fig.add_scatter(
            x=divergence_index,
            y=eval_line_graph,
            mode="markers+lines",
            marker=dict(size=10, symbol="arrow-up"),
            name="RSI div",
            row=2,
            col=1,
            showlegend=False,
        )

        fig.update_layout(
            xaxis_rangeslider_visible=False,
            height=800,
        )

        fig.show()
    
    return entry, price_open_values[-1], pivot_low - (pivot_low * add_pct_to_sl)

In [6]:
price_data, rsi_data = dl_data()
if not dropped_below_range:
    entry, price_open, sl_price = entry_checker(
        rsi_data=rsi_data,
        price_data=price_data,
        price_range_start=31000,
        price_range_end=30000,
        lookback=25,
        pivot_low_lookback=100,
    )

In [7]:
position = float(
    session_test.get_positions(
        category="linear",
        symbol=symbol,
    )[
        "result"
    ]["list"][
        0
    ]["positionValue"]
)
average_entry = float(
    session_test.get_positions(
        category="linear",
        symbol=symbol,
    )[
        "result"
    ]["list"][
        0
    ]["avgPrice"]
)
if average_entry == 0:
    average_entry = price_open
equity = float(
    session_test.get_wallet_balance(
        accountType="CONTRACT",
        coin="USDT",
    )["result"][
        "list"
    ][0]["coin"][0]["walletBalance"]
)
tpl = round((equity * size_pct) * (trade_counter + 1), 0)

In [8]:
def get_size_value():
    if tpl <= round(equity * max_equity_risk_pct, 0) and position > 0:
        size_value = -(
            (
                -tpl * price_open * average_entry
                + price_open * position * average_entry
                - sl_price * price_open * position
                + sl_price * price_open * position * market_fee_pct
                + price_open * position * average_entry * limit_fee_pct
            )
            / (
                average_entry
                * (
                    price_open
                    - sl_price
                    + price_open * limit_fee_pct
                    + sl_price * market_fee_pct
                )
            )
        )
    elif position == 0:
        size_value = -tpl / (
            sl_price / price_open - 1 - limit_fee_pct - sl_price * market_fee_pct / price_open
        )
    else:
        size_value = np.nan
    return size_value
size_value = get_size_value()

In [9]:
def get_leverage():
    leverage = -average_entry / (
            sl_price
            - sl_price * 0.002
            - average_entry  # TODO .2 is percent padding user wants
            - mmr_pct * average_entry
        )  # math checked
    if leverage> 90:
        leverage= 90
    return leverage
leverage = round(get_leverage(),2)

In [10]:
def get_tp_price(
    rr,
):
    tp_price = (
        average_entry * tpl * rr
        + average_entry * size_value
        + average_entry * size_value * limit_fee_pct
    ) / (size_value * (1 - limit_fee_pct))
    return tp_price

tp_price = get_tp_price(2)

In [11]:
session_test.cancel_all_orders(
    category="linear",
    settleCoin="USDT",
)
try:
    session_test.set_leverage(
        category="linear",
        symbol="BTCUSDT",
        buyLeverage=f"{leverage}",
        sellLeverage=f"{leverage}",
    )
except:
    pass
order_id = session_test.place_order(
    category="linear",
    symbol="BTCUSDT",
    side="Buy",
    orderType="Limit",
    price='30100',
    # qty=f"{.001}",
    qty=f"{round(get_size_value() / average_entry, 4)}",
    timeInForce="PostOnly",
    stopLoss=f"{round(sl_price,1)}",
)['result']['orderId']

InvalidRequestError: estimated will trigger liq (ErrCode: 10001) (ErrTime: 14:01:12).
Request → POST https://api-testnet.bybit.com/v5/order/create: {"category": "linear", "symbol": "BTCUSDT", "side": "Buy", "orderType": "Limit", "price": "30100", "qty": "0.002", "timeInForce": "PostOnly", "stopLoss": "28824.2"}.

In [30]:
if (
    session_test.get_open_orders(
        category="linear",
        symbol=symbol,
        orderId=order_id,
    )[
        "result"
    ]["list"][0]["orderStatus"]
    == "Filled"
):
    session_test.place_order(
        category="linear",
        symbol="BTCUSDT",
        side="Sell",
        orderType="Limit",
        qty="100",
        price="32000",
        timeInForce="PostOnly",
        reduceOnly=True,
    )