In [5]:
import sys
sys.path.append('../..')
import pandas as pd
pd.options.mode.chained_assignment = None  # default='warn'
import matplotlib.pyplot as plt
import ta
import numpy as np
from utilities.data_manager import ExchangeDataManager

In [129]:
class Strategy():
    def __init__(
        self,
        pair,
        type=["long"],
        params={"leverage": 1},
    ):
        self.df_pair = None
        self.df = None
        self.exchange_name = "mexc"
        self.pair = pair
        self.initial_wallet = 1000
        self.use_long = "long" in type
        self.use_short = "short" in type
        self.params = params
        self.result_df = None

    def get_pair_data(self, timeframe, start = 2050, end = 2050):
        exchange = ExchangeDataManager(
            exchange_name=self.exchange_name,
            path_download="./database/exchanges"
        )

        self.df_pair = exchange.load_data(self.pair, timeframe, start, end)

    def custom_kernel(x, h, alpha, x_0):
    # x is expected to be a pandas Series here
        if len(x) < h + 1:
            return np.nan  # Handle not enough data gracefully
        indices = np.arange(x_0, x_0 - h - 1, -1)
        weights = np.power(1 + (np.power((indices - x_0), 2) / (2 * alpha * h * h)), -alpha)
        weighted_sum = np.dot(weights, x.tail(h+1).values)  # Use .tail(h+1) to ensure we're getting the last h+1 elements
        return weighted_sum / np.sum(weights)

    def custom_atr(df):
        tr = np.log(df['high']) - np.log(df['low'])
        tr = np.maximum.reduce([
            tr,
            np.abs(np.log(df['high']) - np.log(df['close'].shift())),
            np.abs(np.log(df['low']) - np.log(df['close'].shift()))
        ])

        return tr

    def populate_indicators2(self):
        params = self.params
        df = self.df_pair.copy()
        df.drop(
            columns=df.columns.difference(['open','high','low','close','volume']),
            inplace=True
        )
        lookback_window = 8
        relative_weighting = 5.0
        start_regression_bar = 20

        # Calculate log columns
        df['log_high'] = np.log(df['high'])
        df['log_low'] = np.log(df['low'])
        df['log_close'] = np.log(df['close'])

        # Apply the custom kernel using rolling window
        df['customEnvelopeHigh'] = np.exp(df['log_high'].rolling(window=lookback_window + start_regression_bar, min_periods=lookback_window).apply(
            lambda x: custom_kernel(x[::-1], lookback_window, relative_weighting, start_regression_bar), raw=False
        ))
        df['customEnvelopeLow'] = np.exp(df['log_low'].rolling(window=lookback_window + start_regression_bar, min_periods=lookback_window).apply(
            lambda x: custom_kernel(x[::-1], lookback_window, relative_weighting, start_regression_bar), raw=False
        ))

        df['open_long_signal'] = (df['close'].shift(1) > df['customEnvelopeLow'].shift(1)) & (df['close'].shift(2) <= df['customEnvelopeLow'].shift(2))
        df['close_long_signal'] = (df['close'].shift(1) < df['customEnvelopeHigh'].shift(1)) & (df['close'].shift(2) >= df['customEnvelopeHigh'].shift(2))
        #df['open_long_signal'] = (df['close'] > df['customEnvelopeLow']) & (df['close'].shift(1) <= df['customEnvelopeLow'].shift(1))
        #df['close_long_signal'] = (df['close'] < df['customEnvelopeHigh']) & (df['close'].shift(1) >= df['customEnvelopeHigh'].shift(1))

        df_signal = df.loc[
            df["open_long_signal"] | df["close_long_signal"],
            ["open_long_signal", "close_long_signal","open", "close"]
        ]
        df_signal["open_signal_lag"] = df_signal["open_long_signal"].shift(fill_value=False)
        df_signal["close_signal_lag"] = df_signal["close_long_signal"].shift(fill_value=False)
        df_first_signal  = df_signal[
            (~ df_signal["open_signal_lag"] & (df_signal["open_long_signal"] | df_signal["open_long_signal"].isnull())) |
            (~ df_signal["close_signal_lag"] & df_signal["close_long_signal"])
         ]
        df_first_signal["open_signal_lag"] = df_first_signal["open_long_signal"].shift(fill_value=False)
        df_first_signal["close_signal_lag"] = df_first_signal["close_long_signal"].shift(fill_value=False)

        df_order_tmp = df_first_signal[
            (df_first_signal["open_long_signal"] & (~df_first_signal["open_signal_lag"] | df_first_signal["open_long_signal"].isnull())) |
            (df_first_signal["close_long_signal"] & ~ df_first_signal["close_signal_lag"])
        ]
        df_order = df_order_tmp.loc[
            ~ ( ~ df_order_tmp["close_signal_lag"] & ~ df_order_tmp["open_signal_lag"] & df_order_tmp["close_long_signal"])
        ]
        df_order["order_number"] = df_order["open_long_signal"].cumsum()
        df_order["open_lag"] = df_order["open"].shift(-1)
        df_order["open_order"] = df_order["open"].shift()
        df_order.loc[df_order["open_long_signal"], "open_order"] = df_order.loc[df_order["open_long_signal"], "open"]
        df_pair = df[["open", "close", "low", "high", "open_long_signal", "close_long_signal"]]

        leverage = params["leverage"]  # Fixed leverage
        maintenance_margin_percent = 0.004
        wallet = 1000  # Initial wallet balance
        quantity = 0  # Initial quantity

        # Ensure the DataFrame has 'quantity' and 'trade_result' columns initialized
        df_order['quantity'] = 0.0
        df_order['trade_result'] = 0.0
        df_order['trade_result_pct'] = 0.0

        # Iterating over DataFrame rows to process trading signals
        for i, row in df_order.iterrows():
            # Check if there is a signal to open a long position
            if row['open_long_signal']:
                # Calculate the new quantity based on the current wallet and leverage
                quantity = wallet * leverage / row['open']
                # Update the 'quantity' column with the new quantity
                df_order.at[i, 'quantity'] = quantity
                # No change in wallet yet as the position has just opened
                df_order.at[i, 'wallet'] = wallet
                # Track the price at which the position was opened
                open = row['open']
            elif row['close_long_signal']:
                # Calculate the trade result based on the difference between current and open price
                trade_result = (row['open'] - open) * quantity
                # Update the 'trade_result' column with the result of the closed trade
                df_order.at[i, 'trade_result'] = trade_result
                df_order.at[i, 'trade_result_pct'] = trade_result / wallet * 100
                # Update the wallet with the result of the trade
                wallet += trade_result
                # Reset quantity as the trade is closed
                df_order.at[i, 'quantity'] = quantity
                quantity = 0

            # Update the wallet and quantity for the current row
            df_order.at[i, 'wallet'] = wallet

        df_order_tmp = df_order[
            ["order_number", "quantity", "trade_result", "trade_result_pct", "wallet", "open_order"]
        ]
        df_order_final = df_pair.join(df_order_tmp)

        f = df_order_final['order_number'].ffill()
        b = df_order_final['order_number'].bfill()

        df_order_final['order_number'] = df_order_final['order_number'].mask(f == b, f)

        f = df_order_final['open_order'].ffill()
        b = df_order_final['open_order'].bfill()

        df_order_final['open_order'] = df_order_final['open_order'].mask(f == b, f)
        df_order_final['wallet'] = df_order_final['wallet'].ffill()
        #df_order_final["hypothetical_wallet"] = df_order_final["wallet"] + df_order_final["quantity"] * (df_order_final['open'] - df_order_final["open_order"])
        df_order_final["hypothetical_wallet"] = df_order_final["wallet"].shift() + df_order_final["quantity"] * (df_order_final['open'] - df_order_final["open_order"])
        df_order_final["hypothetical_low_result"] =  ((df_order_final["quantity"] * df_order_final["low"]) - df_order_final["wallet"]) / df_order_final["wallet"]
        df_order_final["drawdown"] = (df_order_final["low"] - df_order_final["open_order"]) / df_order_final["open_order"] * 100 * leverage
        df_order_final["is_liquidated"] = df_order_final['hypothetical_wallet'] < (df_order_final["wallet"] / leverage) * maintenance_margin_percent

        self.df = df_order_final


    def get_result_df(self):
        try:
            df = self.df
            if df is not None:
                total_trades = df.order_number.max()
                final_wallet_amount = df.loc[df["open_long_signal"], "wallet"].tail(1)
                total_profit = final_wallet_amount - self.initial_wallet
                total_profit_perc = total_profit / self.initial_wallet * 100
                avg_trade_profit_perc = df["trade_result_pct"].dropna().mean()
                avg_trade_profit = df["trade_result"].dropna().mean()
                max_drawdown = df["drawdown"].min()

                result_df = pd.DataFrame(
                    {
                        "params": str(self.params),
                        "final_wallet_amount": final_wallet_amount,
                        "total_profit": total_profit,
                        "total_profit_perc": total_profit_perc,
                        "total_trades": total_trades,
                        "avg_trade_profit_perc": avg_trade_profit_perc,
                        "avg_trade_profit": avg_trade_profit,
                        "max_drawdown": max_drawdown,
                    }
                )

                return result_df
            else:
                return None
        except Exception as e:
            print(e)
            print(self.params)
            return None

In [130]:
pair = "API3/USDT:USDT"
exchange_name = "mexc"
tf = '1h'
start_date = "2023-01-01 00:00:00"
end_date = "2024-05-14 01:00:00"

In [136]:
def get_pair_data(timeframe, pair, start = 2050, end = 2050):
        exchange = ExchangeDataManager(
            exchange_name=exchange_name,
            path_download="./database/exchanges"
        )

        return exchange.load_data(pair, timeframe, start, end)

In [189]:
df = get_pair_data(timeframe=tf, pair=pair, start=start_date, end=end_date)

In [190]:
def custom_kernel(x, h, alpha, x_0):
    """
    Calculate the kernel weighted average using a window of data points from pandas Series.

    Args:
    x (pandas Series): Rolling window of data points.
    h (int): Number of data points to consider (window size).
    alpha (float): Decay factor controlling weight curvature.
    x_0 (int): Position index to calculate relative weights, usually the last index of the window.

    Returns:
    float: Kernel weighted average of the window.
    """
    if len(x) < h + 1:
        return np.nan  # Not enough data to compute the kernel

    x = np.log(x)

    indices = np.arange(x_0 - h, x_0 + 1)
    weights = np.power(1 + (np.power((x_0 - indices), 2) / (2 * alpha * h * h)), -alpha)
    sum_weights = np.sum(weights)
    sum_x_weights = np.dot(weights, x)

    return np.exp(sum_x_weights / sum_weights) if sum_weights != 0 else np.nan

In [191]:
# Settings
custom_lookback_window = 8
custom_relative_weighting = 5.0
custom_start_regression_bar = 20

In [192]:
df["envelope_low"] = (
    df["low"]
    .rolling(window=custom_lookback_window+1)
    .apply(
        lambda x: custom_kernel(
            x, custom_lookback_window, custom_relative_weighting, custom_start_regression_bar
        ),
        raw=True
    )
)

df["envelope_high"] = (
    df["high"]
    .rolling(window=custom_lookback_window+1)
    .apply(
        lambda x: custom_kernel(
            x, custom_lookback_window, custom_relative_weighting, custom_start_regression_bar
        ),
        raw=True
    )
)

In [193]:
df["open_long_signal"] = df["envelope_low"].shift(1) < df["close"].shift(1)

In [206]:
type(df["open"])

pandas.core.series.Series

In [203]:
df.index[0].tz

In [143]:
envelope_close = custom_kernel(df["close"], custom_lookback_window, custom_relative_weighting, custom_start_regression_bar)
envelope_high = custom_kernel(df["high"], custom_lookback_window, custom_relative_weighting, custom_start_regression_bar)
envelope_low = custom_kernel(df["low"], custom_lookback_window, custom_relative_weighting, custom_start_regression_bar)

In [134]:
strat = Strategy(pair=pair)
strat.get_pair_data(timeframe=tf, start=start_date, end=end_date)
strat.populate_indicators2()
df_result_tmp = strat.get_result_df()
df = strat.df