In [None]:
import numpy as np
import pandas as pd
import pandas_ta as ta

from plotly import graph_objects as go
from tqdm import tqdm

import logging

log = logging.Logger(name="logger", level=logging.INFO)

In [None]:
xau = pd.read_csv("../data/xauusd_1h.csv", index_col="datetime", parse_dates=['datetime'])
xau.head()

In [None]:
xau.index[0]

In [None]:
def pivot_high(highs:pd.Series, window:int=14):
    len = highs.size
    ph = np.zeros(shape=(len, ), dtype=float)
    
    for i in range(window, len-window):
        if highs.iloc[i] == highs.iloc[i-window:i+window].max():
            ph[i] = highs.iloc[i]
    return ph

def pivot_low(lows:pd.Series, window:int=14):
    len = lows.size
    pl = np.zeros(shape=(len, ), dtype=float)
    
    for i in range(window, len-window):
        if lows.iloc[i] == lows.iloc[i-window:i+window].min():
            pl[i] = lows.iloc[i]
    return pl

In [None]:
def pivots(highs:pd.Series, lows:pd.Series, window:int) -> pd.Series:
    len = highs.size
    if lows.size != len:
        raise BufferError("`highs` and `lows` must have the same size!")
    pivots = pd.Series(data=0, index=highs.index)
    pivot_values = pd.Series(data=0, index=highs.index, dtype=np.float64)
    last_pivot = -1
    last_pivot_index = 0
    # last_pivot_value = highs.iloc[0]
    
    for i in tqdm(range(window, len-window)):
        if highs.iloc[i] == highs.iloc[i-window:i+window].max():
            if last_pivot == 1 and highs.iloc[last_pivot_index] < highs.iloc[i]:     # Previous pivot is a lower pivot high?!
                pivots.iloc[last_pivot_index] = 0       # Invalidate the previous pivot high
                pivot_values.iloc[last_pivot_index] = 0
                last_pivot_index = i                    # Set new pivot high
                pivots.iloc[last_pivot_index] = last_pivot
                pivot_values.iloc[last_pivot_index] = highs.iloc[last_pivot_index]
            elif last_pivot == -1:  # Previous pivot is a pivot low?!
                last_pivot_index = i            # Set pivot high
                last_pivot = 1
                pivots.iloc[last_pivot_index] = last_pivot
                pivot_values.iloc[last_pivot_index] = highs.iloc[last_pivot_index]
        elif lows.iloc[i] == lows.iloc[i-window:i+window].min():
            if last_pivot == -1 and lows.iloc[last_pivot_index] > lows.iloc[i]:    # Previous pivot is a higher pivot low?!
                pivots.iloc[last_pivot_index] = 0       # Invalidate the previous pivot low
                pivot_values.iloc[last_pivot_index] = 0
                last_pivot_index = i                    # Set new pivot low
                pivots.iloc[last_pivot_index] = last_pivot
                pivot_values.iloc[last_pivot_index] = lows.iloc[last_pivot_index]
            elif last_pivot == 1:       # Previous pivot is a pivot high
                last_pivot_index = i            # Set pivot low
                last_pivot = -1
                pivots.iloc[last_pivot_index] = last_pivot
                pivot_values.iloc[last_pivot_index] = lows.iloc[last_pivot_index]
    return pivots, pivot_values

In [None]:
xau['pivots'], xau['pivot_values'] = pivots(xau['high'], xau['low'], window=2)

In [None]:
def plot_data(df):
    fig = go.Figure(
        data=[go.Candlestick(x=df.index,
                            close=df['close'],
                            open=df['open'],
                            low=df['low'],
                            high=df['high'])]
    )

    fig.update_layout(title="XAUUSD - H1",
                    xaxis_title="Date Time",
                    yaxis_title="Price",
                    xaxis=dict(type="category"))

    fig.show()

In [None]:
xau[xau['pivots'] == 1].index



In [None]:
xau['pivot_values'].loc[xau['pivot_values'] != 0]

In [None]:
def head_and_shoulders(df:pd.DataFrame, threshold:int, log:logging.Logger) -> pd.Series:
    # if ['pivots', 'open', 'close'] not in df.columns.to_list():
        # raise ValueError("The columns `['pivots', 'open', 'close']` must exist...")

    buy_sell = pd.Series(data=0, index=df.index)
    len = df.shape[0]
    print(len, end="\n\n\n")
    pip_value = 0.1
    
    pivots_iloc = np.where(df['pivots'] != 0)[0]
    pivots_index = df.iloc[pivots_iloc].index
    
    for i in tqdm(range(len)):
        prev_5_pivots_index = pivots_index[pivots_index < df.index[i]]
        # print(prev_5_pivots_index)
        if prev_5_pivots_index.size < 5:
            continue
        else:
            prev_5_pivots_index = prev_5_pivots_index[-5:]
        prev_5_pivots = df['pivots'].loc[prev_5_pivots_index]
        prev_5_pivot_values = df['pivot_values'].loc[prev_5_pivots_index]

        if prev_5_pivots.loc[prev_5_pivots_index[0]] == 1:
            log.info("Start pivot high on left shoulder!")
            # if np.abs(prev_5_pivot_values.iloc[0] - prev_5_pivot_values.iloc[4]) < threshold * pip_value:
            shoulders = prev_5_pivot_values.iloc[0]
            if np.abs(prev_5_pivot_values.iloc[1] - prev_5_pivot_values.iloc[3]) < threshold * pip_value:
                neck_line = prev_5_pivot_values.iloc[1]
                if prev_5_pivot_values.iloc[2] > prev_5_pivot_values.iloc[0]:
                    if df['close'].iloc[i-1] < neck_line and df['open'].iloc[i-1] > neck_line:
                        buy_sell.iloc[i] = -1   # Sell

        elif prev_5_pivots.loc[prev_5_pivots_index[0]] == -1:
                    log.info("Start pivot low on left shoulder!")
                    # if np.abs(prev_5_pivot_values.iloc[0] - prev_5_pivot_values.iloc[4]) < threshold * pip_value:
                    shoulders = prev_5_pivot_values.iloc[0]
                    if np.abs(prev_5_pivot_values.iloc[1] - prev_5_pivot_values.iloc[3]) < threshold * pip_value:
                        neck_line = prev_5_pivot_values.iloc[1]
                        if prev_5_pivot_values.iloc[2] < prev_5_pivot_values.iloc[0]:
                            if df['close'].iloc[i-1] > neck_line and df['open'].iloc[i-1] < neck_line:
                                buy_sell.iloc[i] = 1   # Buy
    return buy_sell

In [None]:
log.setLevel(logging.INFO)
buy_sell = head_and_shoulders(xau, 10, log=log)

In [None]:
xau['buy_sell'] = buy_sell

buy_sell.loc[buy_sell != 0].count()

In [None]:
buy_sell.index.size, xau.index.size

In [None]:

def plot_candlestick(df:pd.DataFrame):
    fig = go.Figure(data=[go.Candlestick(
        x=df.index,
        close=df.close,
        open=df.open,
        high=df.high,
        low=df.low,
        # name="XAU_USD -- H1 -- OANDA",
        )])

    fig.update_layout(
        width=1200,
        height=800,
        plot_bgcolor='black',
        paper_bgcolor='black',
        font=dict(color='white'),
        xaxis=dict(type='category',
                   showgrid=False,
                   zeroline=False),
        yaxis=dict(showgrid=False,
                   zeroline=False)
    )

    fig.show()

In [None]:
plot_candlestick(xau.iloc[0:500])

In [None]:
xau_cap = xau.rename(columns={
    'open': 'Open',
    'high': 'High',
    'low': 'Low',
    'close': 'Close',
    'volume': 'Volume'
})

In [None]:
import backtrader as bt
from backtesting.backtesting import Strategy
from backtesting.lib import Sequence

class SIGNAL(bt.Indicator):
    lines = ('signal', )
    params = ()
    def __init__(self):
        self.lines.signal = bt.feeds.PandasData(dataname=pd.DataFrame(xau['buy_sell'], columns=['buy_sell']))

def signal():
    return xau['buy_sell']


class HnSStrategy(Strategy):       
    sl_points = 400
    point_size = .01
    rrr = 2.5
    lot_size = 0.1

    def init(self):
        super().init()
        self.signal = self.I(signal, name="BUY/SELL")

    def next(self):
        super().next()
        if self.position:
            return

        if self.signal == -1:
            sl = self.data.Close[-1] + self.sl_points * self.point_size
            print(self.data.Close[-1])
            tp = self.data.Close[-1] - self.rrr * self.sl_points * self.point_size
            self.sell(size=self.lot_size, sl=sl, tp=tp, limit=None)
        elif self.signal == 1:
            sl = self.data.Close[-1] - self.sl_points * self.point_size
            print(self.data.Close[-1])
            tp = self.data.Close[-1] + self.rrr * self.sl_points * self.point_size
            self.buy(size=self.lot_size, sl=sl, tp=tp, limit=None)

In [None]:
from backtesting.backtesting import Backtest

params = {'sl_points': 200,
          'rrr': 1.5}

bt = Backtest(data=xau_cap, strategy=HnSStrategy, cash=100_000.0, commission=0)

results = bt.run()

In [None]:
from matplotlib import pyplot as plt

results['_equity_curve'].plot(y='Equity')
plt.show()


In [None]:
results['_equity_curve']['DrawdownPct'].plot()
plt.show()