In [1]:
pip install pandas numpy


Note: you may need to restart the kernel to use updated packages.


In [3]:
import pandas as pd
import numpy as np
from collections import defaultdict, Counter

# Load and prepare data
file_path = r"C:\Users\Broker\Downloads\eurusd.csv"
df = pd.read_csv(file_path)

# Handle datetime conversion
df["Gmt time"] = pd.to_datetime(df["Gmt time"], format="%d.%m.%Y %H:%M:%S.%f")
df["PKT time"] = df["Gmt time"] + pd.Timedelta(hours=5)
df.set_index("PKT time", inplace=True)

# Filter session (1 PM to 5 PM PKT)
df["Hour"] = df.index.hour
df_session = df[(df["Hour"] >= 13) & (df["Hour"] < 17)].copy()
session_groups = df_session.groupby(df_session.index.date)

# Calculate Goldbach levels
def calculate_goldbach_levels(group):
    high = group['High'].max()
    low = group['Low'].min()
    range_ = high - low
    
    if range_ == 0:
        return {
            'gold_0.1': low,
            'gold_0.2': low,
            'gold_0.8': low,
            'gold_0.9': low
        }
    
    return {
        'gold_0.1': low + range_ * 0.1,
        'gold_0.2': low + range_ * 0.2,
        'gold_0.8': low + range_ * 0.8,
        'gold_0.9': low + range_ * 0.9
    }

# Calculate AXR and ATR
def calculate_axr_atr(group, lookback=20, atr_period=14):
    group = group.copy()
    
    # Calculate ADR and AXR
    group['High_max'] = group['High'].rolling(window=lookback, min_periods=1).max()
    group['Low_min'] = group['Low'].rolling(window=lookback, min_periods=1).min()
    group['ADR'] = group['High_max'] - group['Low_min']
    
    # Avoid division by zero
    group['AXR'] = np.where(
        group['ADR'] > 0,
        (group['Close'] - group['Low_min']) / group['ADR'],
        0.5
    )
    
    # Calculate ATR
    group['High_low'] = group['High'] - group['Low']
    group['High_close'] = abs(group['High'] - group['Close'].shift())
    group['Low_close'] = abs(group['Low'] - group['Close'].shift())
    group['TR'] = group[['High_low', 'High_close', 'Low_close']].max(axis=1)
    group['ATR'] = group['TR'].rolling(window=atr_period, min_periods=1).mean()
    
    return group

# Main strategy implementation
def detect_fvg_and_backtest(group, bias):
    trades = []
    candles = group.copy().reset_index()
    
    # Validate minimum data
    if len(candles) < 5:
        return trades
    
    goldbach_levels = calculate_goldbach_levels(group)
    group = calculate_axr_atr(group)
    
    for i in range(2, len(candles) - 3):
        try:
            c0 = candles.iloc[i-2]
            c1 = candles.iloc[i-1]
            c2 = candles.iloc[i]
            
            atr = group['ATR'].iloc[i]
            if pd.isna(atr) or atr == 0:
                atr = 0.0005
            
            # Goldbach zone check
            in_goldbach_zone = (
                (c0["High"] <= goldbach_levels['gold_0.2'] and c2["Low"] >= goldbach_levels['gold_0.1']) or
                (c0["High"] <= goldbach_levels['gold_0.9'] and c2["Low"] >= goldbach_levels['gold_0.8'])
            )
            
            # AXR threshold
            axr_value = group['AXR'].iloc[i]
            if pd.isna(axr_value):
                axr_value = 0.5
            
            axr_threshold = 0.6 if bias == "short" else 0.4
            axr_ok = axr_value > axr_threshold if bias == "short" else axr_value < axr_threshold
            
            # Minimum FVG size filter
            min_fvg_size = atr * 0.3
            fvg_size = abs(c2["Low"] - c0["High"]) if bias == "short" else abs(c0["Low"] - c2["High"])
            
            # SHORT: c0 above c2
            if bias == "short" and c0["High"] < c2["Low"] and in_goldbach_zone and axr_ok and fvg_size >= min_fvg_size:
                entry = c2["Low"]
                sl = max(candles.iloc[i+1]["High"], c2["High"], c1["High"])
                
                if sl > entry:  # Valid SL
                    rr_ratio = 2.5 if atr > 0.0015 else 3.5
                    tp = entry - rr_ratio * (sl - entry)
                    
                    # Check next 3 candles
                    for j in range(i+1, min(i+4, len(candles))):
                        high_j = candles.iloc[j]["High"]
                        low_j = candles.iloc[j]["Low"]
                        
                        if high_j >= entry:
                            if low_j <= tp:
                                trades.append("win")
                                break
                            elif high_j >= sl:
                                trades.append("loss")
                                break
            
            # LONG: c0 below c2
            elif bias == "long" and c0["Low"] > c2["High"] and in_goldbach_zone and axr_ok and fvg_size >= min_fvg_size:
                entry = c2["High"]
                sl = min(candles.iloc[i+1]["Low"], c2["Low"], c1["Low"])
                
                if sl < entry:  # Valid SL
                    rr_ratio = 2.5 if atr > 0.0015 else 3.5
                    tp = entry + rr_ratio * (entry - sl)
                    
                    # Check next 3 candles
                    for j in range(i+1, min(i+4, len(candles))):
                        high_j = candles.iloc[j]["High"]
                        low_j = candles.iloc[j]["Low"]
                        
                        if low_j <= entry:
                            if high_j >= tp:
                                trades.append("win")
                                break
                            elif low_j <= sl:
                                trades.append("loss")
                                break
        
        except (KeyError, IndexError, ValueError) as e:
            continue
    
    return trades

# Execute strategy
bias_results = defaultdict(str)
all_trades = []

for date, group in session_groups:
    try:
        # Skip if insufficient data
        if len(group) < 5:
            continue
        
        # Determine bias
        open_price = group.iloc[0]["Open"]
        above_count = (group["High"] > open_price).sum()
        below_count = (group["Low"] < open_price).sum()

        if above_count >= 24:
            bias = "short"
        elif below_count >= 24:
            bias = "long"
        else:
            bias = "no trade"
        
        bias_results[date] = bias
        
        if bias == "no trade":
            continue
        
        trades = detect_fvg_and_backtest(group, bias)
        all_trades.extend(trades)
    
    except Exception as e:
        print(f"Error processing date {date}: {e}")
        continue

# Results analysis
results = Counter(all_trades)
total_trades = results['win'] + results['loss'] if 'win' in results and 'loss' in results else 0
win_rate = (results.get('win', 0) / total_trades * 100) if total_trades > 0 else 0
profit_factor = (results.get('win', 0) / results.get('loss', 1)) if results.get('loss', 0) > 0 else 0

print("\n=== FVG Backtest Results ===")
print(f"Total Trades: {total_trades}")
print(f"Winning Trades: {results.get('win', 0)}")
print(f"Losing Trades: {results.get('loss', 0)}")
print(f"Win Rate: {win_rate:.2f}%")
print(f"Profit Factor: {profit_factor:.2f}")
print(f"\nBias Distribution:")
for date, bias in sorted(bias_results.items()):
    print(f"  {date}: {bias}")


=== FVG Backtest Results ===
Total Trades: 0
Winning Trades: 0
Losing Trades: 23
Win Rate: 0.00%
Profit Factor: 0.00

Bias Distribution:
  2023-11-14: short
  2023-11-15: long
  2023-11-16: short
  2023-11-17: short
  2023-11-18: no trade
  2023-11-19: no trade
  2023-11-20: short
  2023-11-21: long
  2023-11-22: long
  2023-11-23: short
  2023-11-24: short
  2023-11-25: no trade
  2023-11-26: no trade
  2023-11-27: short
  2023-11-28: short
  2023-11-29: long
  2023-11-30: long
  2023-12-01: short
  2023-12-02: no trade
  2023-12-03: no trade
  2023-12-04: short
  2023-12-05: short
  2023-12-06: short
  2023-12-07: long
  2023-12-08: short
  2023-12-09: no trade
  2023-12-10: no trade
  2023-12-11: short
  2023-12-12: short
  2023-12-13: short
  2023-12-14: short
  2023-12-15: long
  2023-12-16: no trade
  2023-12-17: no trade
  2023-12-18: long
  2023-12-19: short
  2023-12-20: long
  2023-12-21: short
  2023-12-22: short
  2023-12-23: no trade
  2023-12-24: no trade
  2023-12-25: n

In [31]:
# In your detect_fvg_and_backtest function, replace with:

def detect_fvg_and_backtest(group, bias):
    trades = []
    candles = group.copy().reset_index()
    goldbach_levels = calculate_goldbach_levels(group)
    group = calculate_axr(group)
    
    for i in range(2, len(candles) - 3):  # Leave room for 3-candle evaluation
        c0, c1, c2 = candles.loc[i-2], candles.loc[i-1], candles.loc[i]
        atr = (group['High'].rolling(14).max() - group['Low'].rolling(14).min()).iloc[i]
        
        # Wider Goldbach zones
        in_goldbach_zone = (
            (c0["High"] <= goldbach_levels['gold_0.2'] and c2["Low"] >= goldbach_levels['gold_0.1']) or
            (c0["High"] <= goldbach_levels['gold_0.9'] and c2["Low"] >= goldbach_levels['gold_0.8'])
        )
        
        # Dynamic AXR threshold
        axr_threshold = 0.6 if bias == "short" else 0.4
        axr_ok = c2['AXR'] > axr_threshold if bias == "short" else c2['AXR'] < axr_threshold
        
        if bias == "short" and c0["High"] < c2["Low"] and in_goldbach_zone and axr_ok:
            entry = c2["Low"]
            sl = max(candles.loc[i+1]["High"], c2["High"], c1["High"])
            rr_ratio = 2.5 if atr > 0.0015 else 3.5
            tp = entry - rr_ratio * (sl - entry)
            
            for j in range(i+1, min(i+4, len(candles))):
                if candles.loc[j]["High"] >= entry:
                    if candles.loc[j]["Low"] <= tp:
                        trades.append("win")
                        break
                    elif candles.loc[j]["High"] >= sl:
                        trades.append("loss")
                        break
        
        elif bias == "long" and c0["Low"] > c2["High"] and in_goldbach_zone and axr_ok:
            entry = c2["High"]
            sl = min(candles.loc[i+1]["Low"], c2["Low"], c1["Low"])
            rr_ratio = 2.5 if atr > 0.0015 else 3.5
            tp = entry + rr_ratio * (entry - sl)
            
            for j in range(i+1, min(i+4, len(candles))):
                if candles.loc[j]["Low"] <= entry:
                    if candles.loc[j]["High"] >= tp:
                        trades.append("win")
                        break
                    elif candles.loc[j]["Low"] <= sl:
                        trades.append("loss")
                        break
    
    return trades

In [5]:
import pandas as pd
import numpy as np
from collections import defaultdict, Counter

# Load and prepare data
file_path = r"C:\Users\Broker\Downloads\eurusd.csv"
df = pd.read_csv(file_path)
df["Gmt time"] = pd.to_datetime(df["Gmt time"], format="%d.%m.%Y %H:%M:%S.%f")
df["PKT time"] = df["Gmt time"] + pd.Timedelta(hours=5)
df.set_index("PKT time", inplace=True)

# Filter session (1 PM to 5 PM PKT)
df["Hour"] = df.index.hour
df_session = df[(df["Hour"] >= 13) & (df["Hour"] < 17)].copy()
session_groups = df_session.groupby(df_session.index.date)

# Calculate Goldbach levels (PO3 27)
def calculate_goldbach_levels(group):
    high = group['High'].max()
    low = group['Low'].min()
    range_ = high - low
    
    return {
        'gold_0.1': low + range_ * 0.1,
        'gold_0.2': low + range_ * 0.2,
        'gold_0.8': low + range_ * 0.8,
        'gold_0.9': low + range_ * 0.9
    }

# Calculate AXR (20-period lookback) and ADR
def calculate_axr_adr(group, lookback=20):
    group['ADR'] = group['High'].rolling(lookback).max() - group['Low'].rolling(lookback).min()
    group['AXR'] = (group['Close'] - group['Low'].rolling(lookback).min()) / group['ADR']
    group['ATR'] = group['High'].rolling(14).max() - group['Low'].rolling(14).min()
    return group

# Main strategy implementation
def detect_fvg_and_backtest(group, bias):
    trades = []
    candles = group.copy()
    candles = candles.reset_index()  # Convert index to column
    goldbach_levels = calculate_goldbach_levels(group)
    group = calculate_axr_adr(group)
    
    for i in range(2, len(candles) - 3):
        c0, c1, c2 = candles.iloc[i-2], candles.iloc[i-1], candles.iloc[i]
        atr = group['ATR'].iloc[i]
        
        # Get time correctly from the datetime column
        current_time = candles.iloc[i]['PKT time']
        if current_time.hour > 15:  # After 3PM PKT
            continue
            
        # Wider Goldbach zones
        in_goldbach_zone = (
            (c0["High"] <= goldbach_levels['gold_0.2'] and c2["Low"] >= goldbach_levels['gold_0.1']) or
            (c0["High"] <= goldbach_levels['gold_0.9'] and c2["Low"] >= goldbach_levels['gold_0.8'])
        )
        
        # Dynamic AXR threshold
        axr_threshold = 0.6 if bias == "short" else 0.4
        axr_ok = c2['AXR'] > axr_threshold if bias == "short" else c2['AXR'] < axr_threshold
        
        # Minimum FVG size filter
        min_fvg_size = atr * 0.3
        fvg_size = c2["Low"] - c0["High"] if bias == "short" else c0["Low"] - c2["High"]
        
        if bias == "short" and c0["High"] < c2["Low"] and in_goldbach_zone and axr_ok and fvg_size >= min_fvg_size:
            entry = c2["Low"]
            sl = max(candles.iloc[i+1]["High"], c2["High"], c1["High"])
            rr_ratio = 2.5 if atr > 0.0015 else 3.5
            tp = entry - rr_ratio * (sl - entry)
            
            for j in range(i+1, min(i+4, len(candles))):
                if candles.iloc[j]["High"] >= entry:
                    if candles.iloc[j]["Low"] <= tp:
                        trades.append("win")
                        break
                    elif candles.iloc[j]["High"] >= sl:
                        trades.append("loss")
                        break
        
        elif bias == "long" and c0["Low"] > c2["High"] and in_goldbach_zone and axr_ok and fvg_size >= min_fvg_size:
            entry = c2["High"]
            sl = min(candles.iloc[i+1]["Low"], c2["Low"], c1["Low"])
            rr_ratio = 2.5 if atr > 0.0015 else 3.5
            tp = entry + rr_ratio * (entry - sl)
            
            for j in range(i+1, min(i+4, len(candles))):
                if candles.iloc[j]["Low"] <= entry:
                    if candles.iloc[j]["High"] >= tp:
                        trades.append("win")
                        break
                    elif candles.iloc[j]["Low"] <= sl:
                        trades.append("loss")
                        break
    
    return trades

# Execute strategy
bias_results = defaultdict(str)
all_trades = []

for date, group in session_groups:
    # Determine bias
    open_price = group.iloc[0]["Open"]
    above_count = (group["High"] > open_price).sum()
    below_count = (group["Low"] < open_price).sum()

    if above_count >= 24:
        bias = "short"
    elif below_count >= 24:
        bias = "long"
    else:
        bias = "no trade"
    
    bias_results[date] = bias
    
    if bias == "no trade":
        continue
    
    trades = detect_fvg_and_backtest(group, bias)
    all_trades.extend(trades)

# Results analysis
results = Counter(all_trades)
total_trades = results['win'] + results['loss'] if 'win' in results and 'loss' in results else 0
win_rate = results['win'] / total_trades * 100 if total_trades > 0 else 0

print("\n=== Backtest Results ===")
print(f"Total Trades: {total_trades}")
print(f"Winning Trades: {results.get('win', 0)}")
print(f"Losing Trades: {results.get('loss', 0)}")
print(f"Win Rate: {win_rate:.2f}%")
print(f"Profit Factor: {results.get('win', 0)/results.get('loss', 1):.2f}" if results.get('loss', 0) > 0 else 'N/A')

KeyError: 'AXR'