In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, time


def find_swing_points(df, window=3):
    """
    Find swing highs and lows using a simple window approach

    Parameters:
    df: DataFrame with OHLC data
    window: Number of periods on each side to confirm swing point

    Returns:
    DataFrame with swing_high and swing_low columns
    """
    df = df.copy()
    df["swing_high"] = np.nan
    df["swing_low"] = np.nan

    for i in range(window, len(df) - window):
        # Check for swing high
        is_swing_high = True
        for j in range(i - window, i + window + 1):
            if j != i and df.iloc[j]["high"] >= df.iloc[i]["high"]:
                is_swing_high = False
                break
        if is_swing_high:
            df.iloc[i, df.columns.get_loc("swing_high")] = df.iloc[i]["high"]

        # Check for swing low
        is_swing_low = True
        for j in range(i - window, i + window + 1):
            if j != i and df.iloc[j]["low"] <= df.iloc[i]["low"]:
                is_swing_low = False
                break
        if is_swing_low:
            df.iloc[i, df.columns.get_loc("swing_low")] = df.iloc[i]["low"]

    return df


def backtest_first_15min_breakout_with_equity(
    df, initial_capital=50000, risk_reward_ratio=2, swing_window=3
):
    """
    Backtest strategy with equity curve tracking
    SL: Previous swing low (for long) / Previous swing high (for short)
    Target: 2x the SL distance

    Parameters:
    df: DataFrame with columns ['date', 'open', 'high', 'low', 'close']
    initial_capital: Starting capital
    risk_reward_ratio: Target profit = risk_reward_ratio * stop_loss
    swing_window: Window for swing point detection

    Returns:
    Dictionary with backtest results including equity curve
    """

    # Ensure proper data types
    df = df.copy()
    df["datetime"] = pd.to_datetime(df["date"])
    df["date"] = df["datetime"].dt.date

    # Sort by datetime
    df = df.sort_values("datetime").reset_index(drop=True)

    # Find swing points
    df = find_swing_points(df, swing_window)

    trades = []
    equity_curve = []
    current_capital = initial_capital

    # Group by date to process each trading day
    for date, day_data in df.groupby("date"):
        day_data = day_data.reset_index(drop=True)

        # Get first 15 minutes data (9:15 to 9:30)
        first_15_mask = (day_data["datetime"].dt.time >= time(9, 15)) & (
            day_data["datetime"].dt.time <= time(9, 30)
        )
        first_15_data = day_data[first_15_mask]

        if len(first_15_data) < 2:  # Need at least some data for first 15 minutes
            continue

        # Calculate first 15 minutes high and low
        first_15_high = first_15_data["high"].max()
        first_15_low = first_15_data["low"].min()

        # Get data after 9:30 for potential trades
        after_930_mask = day_data["datetime"].dt.time > time(9, 30)
        after_930_data = day_data[after_930_mask].reset_index(drop=True)

        if len(after_930_data) == 0:
            continue

        # Look for breakout signals
        in_trade = False

        for i, row in after_930_data.iterrows():
            if in_trade:
                continue

            trade_signal = None
            entry_price = None
            stop_loss = None
            target = None

            # Get previous swing points from the entire day's data up to current point
            current_idx = len(first_15_data) + i
            prev_data = day_data.iloc[: current_idx + 1]

            # Check for breakout above first 15 min high
            if row["close"] > first_15_high:
                # Find most recent swing low for stop loss
                swing_lows = prev_data.dropna(subset=["swing_low"])
                if len(swing_lows) > 0:
                    recent_swing_low = swing_lows.iloc[-1]["swing_low"]

                    trade_signal = "LONG"
                    entry_price = row["close"]
                    stop_loss = recent_swing_low
                    risk = entry_price - stop_loss

                    # Only take trade if stop loss makes sense (below entry)
                    if risk > 0:
                        target = entry_price + (risk * risk_reward_ratio)
                    else:
                        trade_signal = None

            # Check for breakout below first 15 min low
            elif row["close"] < first_15_low:
                # Find most recent swing high for stop loss
                swing_highs = prev_data.dropna(subset=["swing_high"])
                if len(swing_highs) > 0:
                    recent_swing_high = swing_highs.iloc[-1]["swing_high"]

                    trade_signal = "SHORT"
                    entry_price = row["close"]
                    stop_loss = recent_swing_high
                    risk = stop_loss - entry_price

                    # Only take trade if stop loss makes sense (above entry)
                    if risk > 0:
                        target = entry_price - (risk * risk_reward_ratio)
                    else:
                        trade_signal = None

            if trade_signal:
                # Calculate position size (risk 2% of capital per trade)
                risk_per_trade = current_capital * 0.02
                risk_per_share = abs(entry_price - stop_loss)
                position_size = (
                    int(risk_per_trade / risk_per_share) if risk_per_share > 0 else 0
                )

                if position_size <= 0:
                    continue

                # Look for exit in remaining candles of the day
                exit_found = False
                exit_price = None
                exit_reason = None
                exit_time = None

                # Check subsequent candles for target or stop loss hit
                for j in range(i + 1, len(after_930_data)):
                    next_candle = after_930_data.iloc[j]

                    if trade_signal == "LONG":
                        # Check if target hit (high of candle reaches target)
                        if next_candle["high"] >= target:
                            exit_price = target
                            exit_reason = "TARGET"
                            exit_time = next_candle["datetime"]
                            exit_found = True
                            break
                        # Check if stop loss hit (low of candle hits stop loss)
                        elif next_candle["low"] <= stop_loss:
                            exit_price = stop_loss
                            exit_reason = "STOP_LOSS"
                            exit_time = next_candle["datetime"]
                            exit_found = True
                            break

                    elif trade_signal == "SHORT":
                        # Check if target hit (low of candle reaches target)
                        if next_candle["low"] <= target:
                            exit_price = target
                            exit_reason = "TARGET"
                            exit_time = next_candle["datetime"]
                            exit_found = True
                            break
                        # Check if stop loss hit (high of candle hits stop loss)
                        elif next_candle["high"] >= stop_loss:
                            exit_price = stop_loss
                            exit_reason = "STOP_LOSS"
                            exit_time = next_candle["datetime"]
                            exit_found = True
                            break

                # If no exit found, close at end of day
                if not exit_found:
                    exit_price = after_930_data.iloc[-1]["close"]
                    exit_reason = "EOD"
                    exit_time = after_930_data.iloc[-1]["datetime"]

                # Calculate P&L
                if trade_signal == "LONG":
                    pnl_per_share = exit_price - entry_price
                else:  # SHORT
                    pnl_per_share = entry_price - exit_price

                total_pnl = pnl_per_share * position_size
                current_capital += total_pnl

                # Record trade
                trade_record = {
                    "date": date,
                    "signal": trade_signal,
                    "entry_time": row["datetime"],
                    "exit_time": exit_time,
                    "entry_price": entry_price,
                    "exit_price": exit_price,
                    "stop_loss": stop_loss,
                    "target": target,
                    "exit_reason": exit_reason,
                    "position_size": position_size,
                    "pnl_per_share": pnl_per_share,
                    "total_pnl": total_pnl,
                    "capital_after_trade": current_capital,
                    "first_15_high": first_15_high,
                    "first_15_low": first_15_low,
                    "risk_per_share": abs(entry_price - stop_loss),
                    "reward_per_share": abs(target - entry_price),
                }
                trades.append(trade_record)

                # Record equity point
                equity_curve.append(
                    {
                        "date": exit_time,
                        "capital": current_capital,
                        "trade_pnl": total_pnl,
                    }
                )

                in_trade = True  # Only one trade per day

    # Convert to DataFrames
    trades_df = pd.DataFrame(trades)
    equity_df = pd.DataFrame(equity_curve)

    if len(trades_df) == 0:
        return {
            "total_trades": 0,
            "win_ratio": 0,
            "total_return": 0,
            "final_capital": initial_capital,
            "trades_df": trades_df,
            "equity_curve": equity_df,
        }

    # Calculate statistics
    winning_trades = trades_df[trades_df["total_pnl"] > 0]
    losing_trades = trades_df[trades_df["total_pnl"] <= 0]

    win_ratio = len(winning_trades) / len(trades_df) * 100
    total_return = ((current_capital - initial_capital) / initial_capital) * 100
    max_drawdown = (
        calculate_max_drawdown(equity_df["capital"]) if len(equity_df) > 0 else 0
    )

    results = {
        "initial_capital": initial_capital,
        "final_capital": current_capital,
        "total_pnl": current_capital - initial_capital,
        "total_return": total_return,
        "total_trades": len(trades_df),
        "winning_trades": len(winning_trades),
        "losing_trades": len(losing_trades),
        "win_ratio": win_ratio,
        "avg_win": winning_trades["total_pnl"].mean() if len(winning_trades) > 0 else 0,
        "avg_loss": losing_trades["total_pnl"].mean() if len(losing_trades) > 0 else 0,
        "max_drawdown": max_drawdown,
        "target_hits": len(trades_df[trades_df["exit_reason"] == "TARGET"]),
        "stop_loss_hits": len(trades_df[trades_df["exit_reason"] == "STOP_LOSS"]),
        "eod_exits": len(trades_df[trades_df["exit_reason"] == "EOD"]),
        "long_trades": len(trades_df[trades_df["signal"] == "LONG"]),
        "short_trades": len(trades_df[trades_df["signal"] == "SHORT"]),
        "trades_df": trades_df,
        "equity_curve": equity_df,
    }

    return results


def calculate_max_drawdown(capital_series):
    """Calculate maximum drawdown from capital series"""
    peak = capital_series.expanding().max()
    drawdown = (capital_series - peak) / peak * 100
    return drawdown.min()


def plot_equity_curve(results, save_path=None):
    """Plot the equity curve"""
    if len(results["equity_curve"]) == 0:
        print("No trades to plot")
        return

    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

    # Equity curve
    equity_df = results["equity_curve"]
    ax1.plot(equity_df["date"], equity_df["capital"], linewidth=2, color="blue")
    ax1.axhline(
        y=results["initial_capital"],
        color="red",
        linestyle="--",
        alpha=0.7,
        label="Initial Capital",
    )
    ax1.set_title("Equity Curve", fontsize=14, fontweight="bold")
    ax1.set_ylabel("Capital (₹)")
    ax1.grid(True, alpha=0.3)
    ax1.legend()

    # Format y-axis
    ax1.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f"₹{x:,.0f}"))

    # Trade P&L
    ax2.bar(
        range(len(equity_df)),
        equity_df["trade_pnl"],
        color=["green" if x > 0 else "red" for x in equity_df["trade_pnl"]],
        alpha=0.7,
    )
    ax2.axhline(y=0, color="black", linestyle="-", alpha=0.5)
    ax2.set_title("Trade P&L", fontsize=14, fontweight="bold")
    ax2.set_xlabel("Trade Number")
    ax2.set_ylabel("P&L (₹)")
    ax2.grid(True, alpha=0.3)

    plt.tight_layout()

    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches="tight")
        print(f"Equity curve saved to {save_path}")

    plt.show()


# Example usage:
def load_and_test_data(file_path):
    """
    Load data from CSV and run backtest with equity curve

    Expected CSV format:
    date,open,high,low,close
    2015-01-09 09:15:00,8285.45,8295.9,8285.45,8292.1,0
    """

    # Load data
    df = pd.read_csv(file_path)

    # Run backtest with equity curve
    results = backtest_first_15min_breakout_with_equity(
        df, initial_capital=50000, risk_reward_ratio=2
    )

    # Print results
    print("=== BACKTEST RESULTS ===")
    print(f"Initial Capital: ₹{results['initial_capital']:,.2f}")
    print(f"Final Capital: ₹{results['final_capital']:,.2f}")
    print(f"Total P&L: ₹{results['total_pnl']:,.2f}")
    print(f"Total Return: {results['total_return']:.2f}%")
    print(f"Max Drawdown: {results['max_drawdown']:.2f}%")
    print(f"Total Trades: {results['total_trades']}")
    print(f"Winning Trades: {results['winning_trades']}")
    print(f"Losing Trades: {results['losing_trades']}")
    print(f"Win Ratio: {results['win_ratio']:.2f}%")
    print(f"Average Win: ₹{results['avg_win']:,.2f}")
    print(f"Average Loss: ₹{results['avg_loss']:,.2f}")
    print(f"Target Hits: {results['target_hits']}")
    print(f"Stop Loss Hits: {results['stop_loss_hits']}")
    print(f"End of Day Exits: {results['eod_exits']}")
    print(f"Long Trades: {results['long_trades']}")
    print(f"Short Trades: {results['short_trades']}")

    # Show sample trades
    if len(results["trades_df"]) > 0:
        print("\n=== SAMPLE TRADES ===")
        print(
            results["trades_df"][
                [
                    "date",
                    "signal",
                    "entry_price",
                    "exit_price",
                    "exit_reason",
                    "position_size",
                    "total_pnl",
                    "capital_after_trade",
                ]
            ].head(10)
        )

    # Plot equity curve
    if len(results["equity_curve"]) > 0:
        plot_equity_curve(results)

    return results

In [None]:
load_and_test_data("NIFTY 50_minute_data.csv")