# Building And Testing A Complete Trading System

In [1]:
import numpy as np
import pandas as pd
import yfinance as yf
import pandas_ta as pa
import plotly.graph_objects as go
from backtesting import Strategy
from backtesting.lib import FractionalBacktest
from tqdm import tqdm



### Data Acquisition

In [2]:
def get_data(symbol: str, period: str = '1000d', interval: str = '1d') -> pd.DataFrame:
    """Download historical price data from Yahoo Finance."""
    data = yf.download(tickers=symbol, period=period, interval=interval, auto_adjust=True)
    data.columns = ['Open', 'High', 'Low', 'Close', 'Volume']
    data = data.reset_index()
    return data

In [3]:
data = get_data('BTC-USD')
data

[*********************100%***********************]  1 of 1 completed


Unnamed: 0,Date,Open,High,Low,Close,Volume
0,2023-05-08,27694.273438,28663.271484,27310.134766,28450.457031,19122903752
1,2023-05-09,27658.775391,27821.400391,27375.601562,27695.068359,14128593256
2,2023-05-10,27621.755859,28322.687500,26883.669922,27654.636719,20656025026
3,2023-05-11,27000.789062,27621.941406,26781.826172,27621.085938,16724343943
4,2023-05-12,26804.990234,27055.646484,25878.429688,26987.662109,19313599897
...,...,...,...,...,...,...
995,2026-01-27,89102.570312,89427.125000,87228.921875,88257.476562,38744942267
996,2026-01-28,89184.570312,90439.289062,88721.460938,89104.046875,39807419296
997,2026-01-29,84561.585938,89200.781250,83250.601562,89169.851562,64653083162
998,2026-01-30,84128.656250,84602.164062,81071.476562,84562.726562,72083816087


### Rejection Pattern Identification

In [4]:
def identify_rejection(data: pd.DataFrame) -> pd.DataFrame:
    """
    Identify bullish and bearish rejection candlestick patterns.
    
    Returns:
        0 = No rejection
        1 = Bearish rejection (upper wick rejection)
        2 = Bullish rejection (lower wick rejection)
    """
    def classify_candle(row):
        body = abs(row['Close'] - row['Open'])
        upper_wick = row['High'] - max(row['Open'], row['Close'])
        lower_wick = min(row['Open'], row['Close']) - row['Low']
        
        # Minimum body size requirement
        min_body = row['Open'] * 0.001
        
        # Bullish rejection (hammer-like pattern)
        if (lower_wick > 1.5 * body and 
            upper_wick < 0.8 * body and 
            body > min_body):
            return 2
        
        # Bearish rejection (shooting star-like pattern)
        elif (upper_wick > 1.5 * body and 
              lower_wick < 0.8 * body and 
              body > min_body):
            return 1
        
        return 0
    
    data['rejection'] = data.apply(classify_candle, axis=1)
    return data

In [5]:
data = identify_rejection(data)

In [6]:
data[data["rejection"] != 0]

Unnamed: 0,Date,Open,High,Low,Close,Volume,rejection
4,2023-05-12,26804.990234,27055.646484,25878.429688,26987.662109,19313599897,2
7,2023-05-15,27192.693359,27646.347656,26766.097656,26931.384766,14413231792,1
17,2023-05-25,26476.207031,26591.519531,25890.593750,26329.460938,13851122697,2
31,2023-06-08,26508.216797,26797.513672,26246.664062,26347.654297,11904824295,1
47,2023-06-24,30548.695312,30804.148438,30290.146484,30708.738281,12147822496,2
...,...,...,...,...,...,...,...
947,2025-12-10,92020.945312,94477.156250,91640.132812,92695.234375,65420694513,1
962,2025-12-25,87234.742188,88501.789062,86949.257812,87608.320312,19953216347,1
966,2025-12-29,87138.140625,90299.156250,86717.914062,87835.789062,48411625849,1
976,2026-01-08,91027.125000,91485.851562,89233.875000,91309.640625,42386697030,2


### Support and Resistance Dectection

In [7]:
def support(df: pd.DataFrame, index: int, n_before: int, n_after: int) -> bool:
    """Check if a candlestick forms a support level."""
    if (df.Low[index - n_before:index].min() < df.Low[index] or
        df.Low[index + 1:index + n_after + 1].min() < df.Low[index]):
        return False
    return True


def resistance(df: pd.DataFrame, index: int, n_before: int, n_after: int) -> bool:
    """Check if a candlestick forms a resistance level."""
    if (df.High[index - n_before:index].max() > df.High[index] or
        df.High[index + 1:index + n_after + 1].max() > df.High[index]):
        return False
    return True


def close_resistance(index: int, levels: list, limit: float, df: pd.DataFrame) -> float:
    """Check if price is testing a resistance level."""
    if not levels:
        return 0
    
    nearest_level = min(levels, key=lambda x: abs(x - df['High'][index]))
    
    c1 = abs(df['High'][index] - nearest_level) <= limit
    c2 = abs(max(df['Open'][index], df['Close'][index]) - nearest_level) <= limit
    c3 = min(df['Open'][index], df['Close'][index]) < nearest_level
    c4 = df['Low'][index] < nearest_level
    
    if (c1 or c2) and c3 and c4:
        return nearest_level
    return 0


def close_support(index: int, levels: list, limit: float, df: pd.DataFrame) -> float:
    """Check if price is testing a support level."""
    if not levels:
        return 0
    
    nearest_level = min(levels, key=lambda x: abs(x - df['Low'][index]))
    
    c1 = abs(df['Low'][index] - nearest_level) <= limit
    c2 = abs(min(df['Open'][index], df['Close'][index]) - nearest_level) <= limit
    c3 = max(df['Open'][index], df['Close'][index]) > nearest_level
    c4 = df['High'][index] > nearest_level
    
    if (c1 or c2) and c3 and c4:
        return nearest_level
    return 0


def is_below_resistance(index: int, lookback: int, level: float, df: pd.DataFrame) -> bool:
    """Check if recent price action stayed below resistance."""
    return df.loc[index - lookback:index - 1, 'High'].max() < level


def is_above_support(index: int, lookback: int, level: float, df: pd.DataFrame) -> bool:
    """Check if recent price action stayed above support."""
    return df.loc[index - lookback:index - 1, 'Low'].min() > level

### Signal Generation

In [8]:
def merge_close_levels(levels: list, threshold: float = 0.001) -> list:
    """Merge support/resistance levels that are very close together."""
    if len(levels) <= 1:
        return levels
    
    levels.sort()
    merged = []
    i = 0
    
    while i < len(levels):
        current = levels[i]
        # Skip levels that are too close to the previous one
        if merged and abs(current - merged[-1]) / current <= threshold:
            i += 1
            continue
        merged.append(current)
        i += 1
    
    return merged


def check_candle_signal(index: int, n1: int, n2: int, level_lookback: int, 
                        window_lookback: int, df: pd.DataFrame) -> int:
    """
    Generate trading signals based on rejection patterns at support/resistance.
    
    Returns:
        0 = No signal
        1 = Sell signal (bearish rejection at resistance)
        2 = Buy signal (bullish rejection at support)
    """
    supports = []
    resistances = []
    
    # Find support and resistance levels
    for i in range(index - level_lookback, index - n2 + 1):
        if support(df, i, n1, n2):
            supports.append(df.Low[i])
        if resistance(df, i, n1, n2):
            resistances.append(df.High[i])
    
    # Merge close levels
    supports = merge_close_levels(supports)
    resistances = merge_close_levels(resistances, threshold=0.001)
    
    # Combine all levels
    all_levels = merge_close_levels(supports + resistances)
    
    # Check for signals
    price_threshold = df.Close[index] * 0.003
    close_resist = close_resistance(index, all_levels, price_threshold, df)
    close_supp = close_support(index, all_levels, price_threshold, df)
    
    # Bearish rejection at resistance
    if (df.rejection[index] == 1 and close_resist and 
        is_below_resistance(index, window_lookback, close_resist, df)):
        return 1
    
    # Bullish rejection at support
    elif (df.rejection[index] == 2 and close_supp and 
          is_above_support(index, window_lookback, close_supp, df)):
        return 2
    
    return 0


def generate_signals(data: pd.DataFrame, n1: int = 8, n2: int = 8, 
                     level_lookback: int = 60) -> pd.DataFrame:
    """Generate trading signals for the entire dataset."""
    window_lookback = n2
    signals = [0] * len(data)
    
    for i in tqdm(range(level_lookback + n1, len(data) - n2), 
                  desc="Generating signals"):
        signals[i] = check_candle_signal(i, n1, n2, level_lookback, 
                                         window_lookback, data)
    
    data['signal'] = signals
    return data

In [9]:
data = generate_signals(data, n1=8, n2=8, level_lookback=60)

Generating signals: 100%|██████████| 924/924 [00:02<00:00, 335.93it/s]


### Visualization

In [10]:
def plot_candlestick_with_signals(data: pd.DataFrame, start: int = 10, 
                                  end: int = 110) -> None:
    """Plot candlestick chart with trading signals."""
    dfpl = data.iloc[start:end].copy()
    
    # Add signal markers
    def get_marker_position(row):
        if row['signal'] == 1:  # Sell signal
            return row['High'] + 1e-4
        elif row['signal'] == 2:  # Buy signal
            return row['Low'] - 1e-4
        return np.nan
    
    dfpl['marker_pos'] = dfpl.apply(get_marker_position, axis=1)
    
    # Create figure
    fig = go.Figure(data=[
        go.Candlestick(
            x=dfpl.index,
            open=dfpl['Open'],
            high=dfpl['High'],
            low=dfpl['Low'],
            close=dfpl['Close'],
            name='Price'
        )
    ])
    
    # Add signal markers
    fig.add_scatter(
        x=dfpl.index,
        y=dfpl['marker_pos'],
        mode="markers",
        marker=dict(size=8, color="MediumPurple"),
        name="Signal"
    )
    
    fig.update_layout(
        title="BTC Price with Trading Signals",
        autosize=False,
        width=1000,
        height=800,
        xaxis_title="Date",
        yaxis_title="Price (USD)"
    )
    
    fig.show()

### Trading Strategies

In [11]:
class FixedSLTPStrategy(Strategy):
    """Strategy with fixed stop-loss and take-profit based on percentage."""
    
    ratio = 2
    risk_perc = 0.1
    
    def init(self):
        super().init()
        self.signal = self.I(lambda: self.data.signal)
    
    def next(self):
        super().next()
        
        current_price = self.data.Close[-1]
        sl_distance = current_price * self.risk_perc
        tp_distance = sl_distance * self.ratio
        
        # Buy signal
        if self.signal == 2 and len(self.trades) == 0:
            sl = max(current_price - sl_distance, current_price * 0.01)  # Ensure positive SL
            tp = current_price + tp_distance
            self.buy(sl=sl, tp=tp)
        
        # Sell signal
        elif self.signal == 1 and len(self.trades) == 0:
            sl = current_price + sl_distance
            tp = max(current_price - tp_distance, current_price * 0.01)  # Ensure positive TP
            self.sell(sl=sl, tp=tp)

In [12]:
class RSIExitStrategy(Strategy):
    """Strategy with RSI-based exits."""
    
    ratio = 1.5
    risk_perc = 0.1
    
    def init(self):
        super().init()
        self.signal = self.I(lambda: self.data.signal)
    
    def next(self):
        super().next()
        
        # Exit conditions based on RSI
        if self.trades:
            if self.trades[-1].is_long and self.data.RSI[-1] >= 80:
                self.trades[-1].close()
            elif self.trades[-1].is_short and self.data.RSI[-1] <= 20:
                self.trades[-1].close()
        
        # Only enter new trades if no open positions
        if not self.trades:
            current_price = self.data.Close[-1]
            sl_distance = current_price * self.risk_perc
            tp_distance = sl_distance * self.ratio
            
            if self.signal == 2:
                sl = max(current_price - sl_distance, current_price * 0.01)
                tp = current_price + tp_distance
                self.buy(sl=sl, tp=tp)
            elif self.signal == 1:
                sl = current_price + sl_distance
                tp = max(current_price - tp_distance, current_price * 0.01)
                self.sell(sl=sl, tp=tp)

In [13]:
class ATRBasedStrategy(Strategy):
    """Strategy using ATR for stop-loss and take-profit."""
    
    atr_multiplier = 3
    ratio = 2
    
    def init(self):
        super().init()
        self.signal = self.I(lambda: self.data.signal)
    
    def next(self):
        super().next()
        
        if len(self.trades) == 0:
            current_price = self.data.Close[-1]
            atr = self.data.ATR[-1]
            sl_distance = atr * self.atr_multiplier
            tp_distance = sl_distance * self.ratio
            
            if self.signal == 2:
                sl = max(current_price - sl_distance, current_price * 0.01)
                tp = current_price + tp_distance
                self.buy(sl=sl, tp=tp)
            elif self.signal == 1:
                sl = current_price + sl_distance
                tp = max(current_price - tp_distance, current_price * 0.01)
                self.sell(sl=sl, tp=tp)

In [14]:
class TrailingStopStrategy(Strategy):
    """Strategy with trailing stop-loss."""
    
    trailing_percent = 0.02
    
    def init(self):
        super().init()
        self.signal = self.I(lambda: self.data.signal)
    
    def next(self):
        super().next()
        
        current_price = self.data.Close[-1]
        trail_distance = current_price * self.trailing_percent
        
        # Update trailing stops
        for trade in self.trades:
            if trade.is_long:
                new_sl = current_price - trail_distance
                trade.sl = max(trade.sl or -np.inf, new_sl, current_price * 0.01)
            else:
                new_sl = current_price + trail_distance
                trade.sl = min(trade.sl or np.inf, new_sl)
        
        # Enter new positions
        if not self.trades:
            if self.signal == 2:
                sl = max(current_price - trail_distance, current_price * 0.01)
                self.buy(sl=sl)
            elif self.signal == 1:
                sl = current_price + trail_distance
                self.sell(sl=sl)

In [15]:
class ATRTrailingStopStrategy(Strategy):
    """Strategy with ATR-based trailing stop-loss."""
    
    atr_divisor = 0.6
    
    def init(self):
        super().init()
        self.signal = self.I(lambda: self.data.signal)
        self.trail_distance = 0
    
    def next(self):
        super().next()
        
        current_price = self.data.Close[-1]
        
        # Update trailing stops
        for trade in self.trades:
            if trade.is_long:
                new_sl = current_price - self.trail_distance
                trade.sl = max(trade.sl or -np.inf, new_sl, current_price * 0.01)
            else:
                new_sl = current_price + self.trail_distance
                trade.sl = min(trade.sl or np.inf, new_sl)
        
        # Enter new positions
        if not self.trades:
            self.trail_distance = self.data.ATR[-1] / self.atr_divisor
            
            if self.signal == 2:
                sl = max(current_price - self.trail_distance, current_price * 0.01)
                self.buy(sl=sl)
            elif self.signal == 1:
                sl = current_price + self.trail_distance
                self.sell(sl=sl)

In [16]:
class MultiTargetStrategy(Strategy):
    """Strategy with multiple take-profit targets."""
    
    lot_size = 1
    ratio = 1.0
    risk_perc = 0.1
    
    def init(self):
        super().init()
        self.signal = self.I(lambda: self.data.signal)
    
    def next(self):
        super().next()
        
        if not self.trades:
            current_price = self.data.Close[-1]
            sl_distance = current_price * self.risk_perc
            
            # Two targets: 80% and 120% of the ratio
            tp1_distance = sl_distance * self.ratio * 0.8
            tp2_distance = sl_distance * self.ratio * 1.2
            
            if self.signal == 2:
                sl = max(current_price - sl_distance, current_price * 0.01)
                tp1 = current_price + tp1_distance
                tp2 = current_price + tp2_distance
                self.buy(sl=sl, tp=tp1, size=self.lot_size)
                self.buy(sl=sl, tp=tp2, size=self.lot_size)
            
            elif self.signal == 1:
                sl = current_price + sl_distance
                tp1 = max(current_price - tp1_distance, current_price * 0.01)
                tp2 = max(current_price - tp2_distance, current_price * 0.01)
                self.sell(sl=sl, tp=tp1, size=self.lot_size)
                self.sell(sl=sl, tp=tp2, size=self.lot_size)

### Backtesting Strategies

In [17]:
data.set_index("Date", inplace=True)

In [18]:
data['ATR'] = pa.atr(high=data.High, low=data.Low, close=data.Close, length=14)
data['RSI'] = pa.rsi(data.Close, length=5)

In [19]:
print("\n" + "=" * 60)
print("BACKTEST RESULTS")
print("=" * 60)

strategies = [
    ("Fixed SL/TP", FixedSLTPStrategy, {'commission': 0.02}),
    ("RSI Exit", RSIExitStrategy, {'commission': 0.05}),
    ("ATR-based", ATRBasedStrategy, {'commission': 0.0}),
    ("Trailing Stop", TrailingStopStrategy, {'commission': 0.0}),
    ("ATR Trailing", ATRTrailingStopStrategy, {'commission': 0.0}),
    ("Multi-Target", MultiTargetStrategy, {'commission': 0.05, 'margin': 1.0}),
]

results = {}
for name, strategy_class, params in strategies:
    print(f"\n{name} Strategy:")
    bt = FractionalBacktest(data, strategy_class, cash=100_000, **params, finalize_trades=True)
    stat = bt.run()
    results[name] = stat
    print(f"  Return: {stat['Return [%]']:.2f}%")
    print(f"  Sharpe Ratio: {stat['Sharpe Ratio']:.2f}")
    print(f"  Max Drawdown: {stat['Max. Drawdown [%]']:.2f}%")
    print(f"  # Trades: {stat['# Trades']}")


BACKTEST RESULTS

Fixed SL/TP Strategy:


FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

  Return: -13.70%
  Sharpe Ratio: -0.74
  Max Drawdown: -15.03%
  # Trades: 1

RSI Exit Strategy:


FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

  Return: -18.73%
  Sharpe Ratio: -0.89
  Max Drawdown: -18.73%
  # Trades: 1

ATR-based Strategy:


FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

  Return: -16.90%
  Sharpe Ratio: -0.28
  Max Drawdown: -34.81%
  # Trades: 1

Trailing Stop Strategy:


FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

  Return: -1.28%
  Sharpe Ratio: -0.13
  Max Drawdown: -4.72%
  # Trades: 1

ATR Trailing Strategy:


FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

  Return: -16.90%
  Sharpe Ratio: -0.28
  Max Drawdown: -34.81%
  # Trades: 1

Multi-Target Strategy:


FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

  Return: -0.00%
  Sharpe Ratio: nan
  Max Drawdown: -0.00%
  # Trades: 2


In [20]:
print("\n" + "=" * 60)
print("OPTIMIZATION: RSI Exit Strategy")
print("=" * 60)

bt = FractionalBacktest(data, RSIExitStrategy, cash=100_000, 
                        commission=0.05, finalize_trades=True)
param_grid = {
    'ratio': list(np.arange(1.5, 3.5, 0.5)),
    'risk_perc': list(np.arange(0.06, 0.2, 0.02))
}

print("Running optimization...")
optimized = bt.optimize(**param_grid, random_state=5, maximize='Return [%]')
print(f"\nBest Return: {optimized['Return [%]']:.2f}%")
print(f"Best Parameters: ratio={optimized._strategy.ratio}, "f"risk_perc={optimized._strategy.risk_perc:.3f}")


OPTIMIZATION: RSI Exit Strategy
Running optimization...


  output = _optimize_grid()


Backtest.optimize:   0%|          | 0/32 [00:00<?, ?it/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]

FractionalBacktest.run:   0%|          | 0/999 [00:00<?, ?bar/s]


Best Return: -15.49%
Best Parameters: ratio=1.5, risk_perc=0.060
