### 1 - Import test data

In [None]:
import pandas as pd
import pandas_ta as ta

df = pd.read_csv("EURUSD_Candlestick_5_M_BID_01.02.2023-17.02.2024.csv")
df["Gmt time"]=df["Gmt time"].str.replace(".000","")
df['Gmt time']=pd.to_datetime(df['Gmt time'],format='%d.%m.%Y %H:%M:%S')
df=df[df.High!=df.Low]

In [None]:
df["EMA_slow"]=ta.ema(df.Close, length=50)
df["EMA_fast"]=ta.ema(df.Close, length=40)
df['ATR']=ta.atr(df.High, df.Low, df.Close, length=7)
df

In [None]:
from statsmodels.nonparametric.kernel_regression import KernelReg
import numpy as np

dfsample = df[0:]

X = dfsample.index
model = KernelReg(endog=dfsample['Close'], exog=dfsample.index, var_type='c', reg_type='lc', bw=[3])
fitted_values, marginal_effects = model.fit()

# Add fitted values to DataFrame
dfsample['NW_Fitted'] = fitted_values

# Calculate residuals
residuals = dfsample['Close'] - fitted_values

# Calculate standard deviation of residuals
std_dev = 2.*np.std(residuals)
#std_dev = dfsample['Close'].rolling(window=30).std()

# Calculate upper and lower envelopes
dfsample['Upper_Envelope'] = dfsample['NW_Fitted'] + std_dev
dfsample['Lower_Envelope'] = dfsample['NW_Fitted'] - std_dev

dfsample[0:100]

In [None]:
my_bbands = ta.bbands(dfsample.Close, length=30, std=2)
dfsample=dfsample.join(my_bbands)
#1.105424	1.097329

In [None]:
dfsample.columns

In [None]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime

# Create a plot with 1 row
fig = make_subplots(rows=1, cols=1)

# Add candlestick plot with customized line colors
fig.add_trace(go.Candlestick(x=dfsample.index,
                             open=dfsample['Open'],
                             high=dfsample['High'],
                             low=dfsample['Low'],
                             close=dfsample['Close'],
                             increasing=dict(line=dict(color='rgba(0, 255, 0, 0.6)', width=0.1), # Red with transparency for increasing
                                             fillcolor='rgba(0, 255, 0, 0.6)'),  # Match fill color with line color
                             decreasing=dict(line=dict(color='rgba(255, 0, 0, 0.6)', width=0.1), # Green with transparency for decreasing
                                             fillcolor='rgba(255, 0, 0, 0.6)')), # Match fill color with line color
              row=1, col=1)

# Add Nadaraya-Watson fitted line
fig.add_trace(go.Scatter(x=dfsample.index, y=dfsample['NW_Fitted'],
                         line=dict(color='green', width=2),
                         name="Nadaraya-Watson Fit"),
              row=1, col=1)

# Add upper standard deviation envelope
fig.add_trace(go.Scatter(x=dfsample.index, y=dfsample['Upper_Envelope'],
                         line=dict(color='rgba(0,0,255,0.2)'), # Light blue color
                         name='Upper Envelope',
                         showlegend=False),
              row=1, col=1)

# Add lower standard deviation envelope
fig.add_trace(go.Scatter(x=dfsample.index, y=dfsample['Lower_Envelope'],
                         line=dict(color='rgba(0,0,255,0.2)'), # Light blue color
                         name='Lower Envelope',
                         fill='tonexty', # This fills the area between this trace and the next trace
                         fillcolor='rgba(0,0,255,0.3)', # More opaque blue fill
                         showlegend=False),
              row=1, col=1)


# Add Bollinger Bands
# fig.add_trace(go.Scatter(x=dfsample.index, y=dfsample['BBU_30_2.0'],
#                          line=dict(color='rgba(0, 0, 255, 0.4)'),  # Upper Band
#                          name='Upper Bollinger Band',
#                          showlegend=True),
#               row=1, col=1)

# fig.add_trace(go.Scatter(x=dfsample.index, y=dfsample['BBM_30_2.0'],
#                          line=dict(color='rgba(0, 0, 255, 0.6)'),  # Middle Band
#                          name='Middle Bollinger Band',
#                          showlegend=True),
#               row=1, col=1)

# fig.add_trace(go.Scatter(x=dfsample.index, y=dfsample['BBL_30_2.0'],
#                          line=dict(color='rgba(0, 0, 255, 0.4)'),  # Lower Band
#                          name='Lower Bollinger Band',
#                          fill='tonexty',  # Fill between this line and the next upper band line
#                          fillcolor='rgba(0, 0, 255, 0.1)',  # Light blue fill between the bands
#                          showlegend=True),
#               row=1, col=1)


# Update layout to set background color to black and remove gridlines
fig.update_layout(
    width=1000,
    height=600,
    sliders=[],
    paper_bgcolor='black',  # Set the background color of the entire figure
    plot_bgcolor='black',   # Set the background color of the plotting area
    xaxis_showgrid=False,   # Remove x-axis gridlines
    yaxis_showgrid=False,   # Remove y-axis gridlines
)

# Show the plot
fig.show()


In [None]:
def ema_signal(df, backcandles):
    # Create boolean Series for conditions
    above = df['EMA_fast'] > df['EMA_slow']
    below = df['EMA_fast'] < df['EMA_slow']

    # Rolling window to check if condition is met consistently over the window
    above_all = above.rolling(window=backcandles).apply(lambda x: x.all(), raw=True).fillna(0).astype(bool)
    below_all = below.rolling(window=backcandles).apply(lambda x: x.all(), raw=True).fillna(0).astype(bool)

    # Assign signals based on conditions
    df['EMASignal'] = 0  # Default no signal
    df.loc[above_all, 'EMASignal'] = 2  # Signal 2 where EMA_fast consistently above EMA_slow
    df.loc[below_all, 'EMASignal'] = 1  # Signal 1 where EMA_fast consistently below EMA_slow

    return df

df = df[-60000:]
df.reset_index(inplace=True, drop=True)
df = ema_signal(dfsample,  7)

In [None]:
df

In [None]:
def total_signal(df):
    # Vectorized conditions for total_signal
    condition_buy = (df['EMASignal'] == 2) & (df['Close'] <= df['Lower_Envelope'])
    condition_sell = (df['EMASignal'] == 1) & (df['Close'] >= df['Upper_Envelope'])

    # Assigning signals based on conditions
    df['Total_Signal'] = 0  # Default no signal
    df.loc[condition_buy, 'Total_Signal'] = 2
    df.loc[condition_sell, 'Total_Signal'] = 1

total_signal(dfsample)


In [None]:
dfsample["TotalSignal"]=dfsample.Total_Signal
dfsample.TotalSignal.value_counts()

In [None]:
import numpy as np
def pointpos(x):
    if x['TotalSignal']==2:
        return x['Low']-1e-4
    elif x['TotalSignal']==1:
        return x['High']+1e-4
    else:
        return np.nan

df['pointpos'] = df.apply(lambda row: pointpos(row), axis=1)

In [None]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime
st=0
dfpl = df[st:st+350]
# Create a plot with 2 rows
fig = make_subplots(rows=3, cols=1)

# Add candlestick plot on the first row
fig.add_trace(go.Candlestick(x=dfpl.index,
                             open=dfpl['Open'],
                             high=dfpl['High'],
                             low=dfpl['Low'],
                             close=dfpl['Close']),
              row=1, col=1)

# Add Bollinger Bands, EMA lines on the same subplot
fig.add_trace(go.Scatter(x=dfpl.index, y=dfpl['BBL_15_2.2'],
                         line=dict(color='green', width=1),
                         name="BBL"),
              row=1, col=1)
fig.add_trace(go.Scatter(x=dfpl.index, y=dfpl['BBU_15_2.2'],
                         line=dict(color='green', width=1),
                         name="BBU"),
              row=1, col=1)
fig.add_trace(go.Scatter(x=dfpl.index, y=dfpl['EMA_fast'],
                         line=dict(color='black', width=1),
                         name="EMA_fast"),
              row=1, col=1)
fig.add_trace(go.Scatter(x=dfpl.index, y=dfpl['EMA_slow'],
                         line=dict(color='blue', width=1),
                         name="EMA_slow"),
              row=1, col=1)

# Add markers for trade entry points on the same subplot
fig.add_trace(go.Scatter(x=dfpl.index, y=dfpl['pointpos'], mode="markers",
                         marker=dict(size=8, color="MediumPurple"),
                         name="entry"),
              row=1, col=1)

fig.update_layout(width=1200, height=800, sliders=[])
fig.show()

In [None]:
from backtesting import Strategy
from backtesting import Backtest

dfopt = dfsample[0:]
def SIGNAL():
    return dfopt.TotalSignal

class MyStrat(Strategy):
    mysize = 3000
    slcoef = 1.1
    TPSLRatio = 1.5

    def init(self):
        super().init()
        self.signal1 = self.I(SIGNAL)

    def next(self):
        super().next()
        slatr = self.slcoef*self.data.ATR[-1]
        TPSLRatio = self.TPSLRatio

        if self.signal1==1 and len(self.trades)==0:
            sl1 = self.data.Close[-1] - slatr
            tp1 = self.data.Close[-1] + slatr*TPSLRatio
            self.buy(sl=sl1, tp=tp1, size=self.mysize)

        elif self.signal1==2 and len(self.trades)==0:
            sl1 = self.data.Close[-1] + slatr
            tp1 = self.data.Close[-1] - slatr*TPSLRatio
            self.sell(sl=sl1, tp=tp1, size=self.mysize)

bt = Backtest(dfopt, MyStrat, cash=250, margin=1/30)
stats, heatmap = bt.optimize(slcoef=[i/10 for i in range(10, 26)],
                    TPSLRatio=[i/10 for i in range(10, 26)],
                    maximize='Return [%]', max_tries=300,
                        random_state=0,
                        return_heatmap=True)
stats

In [None]:
stats["_strategy"]

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

# Convert multiindex series to dataframe
heatmap_df = heatmap.unstack()
plt.figure(figsize=(10, 8))
sns.heatmap(heatmap_df, annot=True, cmap='viridis', fmt='.0f')
plt.show()

In [None]:
from backtesting import Strategy
from backtesting import Backtest

dftest = df[:]
def SIGNAL():
    return dftest.TotalSignal

class MyStrat(Strategy):
    mysize = 3000
    slcoef = 2.6
    TPSLRatio = 2.6

    def init(self):
        super().init()
        self.signal1 = self.I(SIGNAL)

    def next(self):
        super().next()
        slatr = self.slcoef*self.data.ATR[-1]
        TPSLRatio = self.TPSLRatio

        if self.signal1==1 and len(self.trades)==0:
            sl1 = self.data.Close[-1] - slatr
            tp1 = self.data.Close[-1] + slatr*TPSLRatio
            self.buy(sl=sl1, tp=tp1, size=self.mysize)

        elif self.signal1==2 and len(self.trades)==0:
            sl1 = self.data.Close[-1] + slatr
            tp1 = self.data.Close[-1] - slatr*TPSLRatio
            self.sell(sl=sl1, tp=tp1, size=self.mysize)

bt = Backtest(dftest, MyStrat, cash=250, margin=1/30, commission=0.0002)
bt.run()

In [None]:
bt.plot()

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def gaussian_kernel(x, bandwidth):
    return (1 / (np.sqrt(2 * np.pi) * bandwidth)) * np.exp(-0.5 * ((x / bandwidth) ** 2))

def compute_weights(X, x, bandwidth):
    weights = [gaussian_kernel(x - i, bandwidth) for i in X]
    weights /= np.sum(weights) # Normalize to make the weights sum to 1
    return weights

# Generate synthetic price data
np.random.seed(0)
prices = np.array([2 for i in range(0, 100)])  # 100 price points

# Select the current price point (most recent)
current_price_index = 99  # Last price in the series
current_price = prices[current_price_index]

# Bandwidth (standard deviation of the kernel)
bandwidth = 10

# Compute weights for all preceding points
weights = compute_weights(range(0, 99), current_price_index, bandwidth)

# Plot the weights distribution
plt.figure(figsize=(10, 5))
plt.bar(range(current_price_index), weights, color='blue')
plt.title('Weight Distribution for Nadaraya-Watson Estimator')
plt.xlabel('Index of Price Points')
plt.ylabel('Weight')
plt.grid(False)
plt.gca().set_facecolor('black') # Change background color of the plot to black
plt.show()