In [7]:
import pandas as pd
import numpy as np
import random

# ============= تنظیمات GA و Leader Correction ============
POP_SIZE = 30
CROSSOVER_RATE = 0.8
MUTATION_RATE = 0.12
N_GENERATIONS = 50
ELITE_SIZE = 2
WINDOW_SIZE = 200

MODELS = ['PPPL', 'dSMA']
PARAM_BOUNDS = {
    'PPPL': {'n_SMA': (14, 20), 'd11': (0.05, 0.2), 'SL': (50, 150), 'TP': (50, 200)},
    'dSMA': {'n_SMA': (14, 20), 'd11': (0.05, 0.2), 'SL': (50, 150), 'TP': (50, 200)}
}

# ================= توابع محاسبات =================
def ValueATR(df, period):
    df['ValueATR'] = (df['High'] - df['Low']).rolling(window=period).mean()
    return df

def avgHL(df, period):
    df['avgHL'] = (df['High'] - df['Low']).rolling(window=period).mean()
    return df

def pp_pl(df, period):
    df['d'] = df['Close'] - df['Open']
    df['pp'] = df['d'].apply(lambda x: x if x > 0 else 0).rolling(period).mean()
    df['pl'] = df['d'].apply(lambda x: abs(x) if x < 0 else 0).rolling(period).mean()
    df['dP'] = abs(df['pp'] - df['pl']) / df[['pp', 'pl']].max(axis=1)
    return df

def DSMA(df, period):
    df['aa'] = df['Open']
    df['bb'] = df['Close'].rolling(period).mean()
    df['dSMA'] = df['aa'] - df['bb']
    df['dD'] = abs(df['dSMA']) / df[['aa', 'bb']].max(axis=1)
    return df

def Setup_Sell_Buy(flag_buy, flag_sell, Open, avgHL_val, wsp_Sl, wsp_Tp):
    dSl, dTp = avgHL_val * wsp_Sl, avgHL_val * wsp_Tp
    Sl, Tp = None, None
    if flag_buy == 1:
        Sl = Open - dSl
        Tp = Open + dTp
    elif flag_sell == 1:
        Sl = Open + dSl
        Tp = Open - dTp
    return Sl, Tp

# ================= بک‌تست با استفاده از iloc =================
def backtest_model(model_name, params, data):
    n_sma, d11, wsp_Sl, wsp_Tp = params
    df = data.copy()
    pp_pl(df, int(n_sma))
    DSMA(df, int(n_sma))
    ValueATR(df, int(n_sma))
    avgHL(df, int(n_sma))

    df['dd1'] = abs(df['dP'] - df['dD'])
    df['BSPower'] = df['dP']

    flag_buy, flag_sell = 0, 0
    net_profit, trades = 0, 0

    for i in range(1, len(df)):
        if 7 <= df['Hour'].iloc[i] < 19 and (flag_buy + flag_sell) == 0:
            if df['ValueATR'].iloc[i-1] > 0 and df['BSPower'].iloc[i-1] > 0:
                if model_name == 'PPPL' and df['dSMA'].iloc[i-1] > 0 and df['pp'].iloc[i-1] > df['pl'].iloc[i-1]:
                    flag_buy = 1
                    Sl, Tp = Setup_Sell_Buy(flag_buy, flag_sell, df['Open'].iloc[i], df['avgHL'].iloc[i-1], wsp_Sl, wsp_Tp)
                    trades += 1
                    if df['Low'].iloc[i] <= Sl:
                        net_profit += Sl - df['Open'].iloc[i]
                        flag_buy = 0
                    elif df['High'].iloc[i] >= Tp:
                        net_profit += Tp - df['Open'].iloc[i]
                        flag_buy = 0

                elif model_name == 'dSMA' and df['dSMA'].iloc[i-1] < 0 and df['pp'].iloc[i-1] < df['pl'].iloc[i-1]:
                    flag_sell = 1
                    Sl, Tp = Setup_Sell_Buy(flag_buy, flag_sell, df['Open'].iloc[i], df['avgHL'].iloc[i-1], wsp_Sl, wsp_Tp)
                    trades += 1
                    if df['High'].iloc[i] >= Sl:
                        net_profit += df['Open'].iloc[i] - Sl
                        flag_sell = 0
                    elif df['Low'].iloc[i] <= Tp:
                        net_profit += df['Open'].iloc[i] - Tp
                        flag_sell = 0

    net_profit *= 100000
    avg_profit = net_profit / trades if trades > 0 else 0
    avg_loss = abs(avg_profit * d11)
    profit_risk_ratio = avg_profit / avg_loss if avg_loss else 0
    equity_curve = df['Close'].cummax() - df['Close']
    drawdown = equity_curve.max() / df['Close'].max()
    return net_profit, profit_risk_ratio, drawdown

# ================= GA استاندارد =================
def initialize_population(bounds):
    return [[random.uniform(bounds[key][0], bounds[key][1]) for key in bounds] for _ in range(POP_SIZE)]

def evaluate_population(pop, objective_func):
    return [objective_func(ind) for ind in pop]

def tournament_selection(pop, scores, k=3):
    selected = []
    for _ in range(len(pop)):
        aspirants = random.sample(list(zip(pop, scores)), k)
        selected.append(max(aspirants, key=lambda x: x[1])[0])
    return selected

def crossover(p1, p2):
    if random.random() < CROSSOVER_RATE:
        point = random.randint(1, len(p1) - 1)
        return p1[:point] + p2[point:], p2[:point] + p1[point:]
    return p1, p2

def mutate(ind, bounds):
    for i, key in enumerate(bounds):
        if random.random() < MUTATION_RATE:
            ind[i] = random.uniform(bounds[key][0], bounds[key][1])
    return ind

def elitism_selection(pop, scores):
    elite_idx = np.argsort(scores)[-ELITE_SIZE:]
    return [pop[i] for i in elite_idx]

# ================= Leader Correction با ایندکس ریست‌شده =================
def genetic_algorithm_leader_correction(data):
    results = []
    for start in range(0, len(data) - WINDOW_SIZE, WINDOW_SIZE):
        window_data = data.iloc[start:start+WINDOW_SIZE].reset_index(drop=True)

        best_models, base_metrics, best_scores = {}, {}, {}
        for model in MODELS:
            bounds = PARAM_BOUNDS[model]
            pop = initialize_population(bounds)
            for _ in range(N_GENERATIONS):
                scores = evaluate_population(pop, lambda p: backtest_model(model, p, window_data)[0])
                elite = elitism_selection(pop, scores)
                selected = tournament_selection(pop, scores)
                children = []
                for i in range(0, POP_SIZE, 2):
                    c1, c2 = crossover(selected[i], selected[(i+1) % POP_SIZE])
                    children.append(mutate(c1, bounds))
                    children.append(mutate(c2, bounds))
                pop = elite + children[:POP_SIZE-ELITE_SIZE]
            final_scores = evaluate_population(pop, lambda p: backtest_model(model, p, window_data)[0])
            best_idx = np.argmax(final_scores)
            best_models[model] = pop[best_idx]
            best_scores[model] = final_scores[best_idx]
            base_metrics[model] = backtest_model(model, pop[best_idx], window_data)

        leader_model = max(best_scores, key=best_scores.get)
        leader_params = best_models[leader_model]
        net_profit, profit_risk, drawdown = base_metrics[leader_model]

        before_np = np.mean([m[0] for m in base_metrics.values()])
        before_pr = np.mean([m[1] for m in base_metrics.values()])
        before_dd = np.mean([m[2] for m in base_metrics.values()])

        results.append({
            'Window_Start': start,
            'Leader_Model': leader_model,
            'n_SMA': leader_params[0], 'd11': leader_params[1],
            'SL': leader_params[2], 'TP': leader_params[3],
            'Net_Profit': net_profit,
            'Profit/Risk': profit_risk,
            'Drawdown': drawdown,
            '%Improvement_NP': (net_profit - before_np) / abs(before_np) * 100 if before_np else 0,
            '%Improvement_PR': (profit_risk - before_pr) / abs(before_pr) * 100 if before_pr else 0,
            '%Improvement_DD': (before_dd - drawdown) / abs(before_dd) * 100 if before_dd else 0
        })
    return pd.DataFrame(results)

# ================= اجرای یک‌بار =================
if __name__ == "__main__":
    df_train = pd.read_csv("EURUSD_H4.csv")
    df_train['Date'] = pd.to_datetime(df_train['Date'])
    df_train['Hour'] = pd.to_datetime(df_train['Time'], format='%H:%M:%S').dt.hour

    df_test = pd.read_csv("USDCHF_H4.csv")
    df_test['Date'] = pd.to_datetime(df_test['Date'])
    df_test['Hour'] = pd.to_datetime(df_test['Time'], format='%H:%M:%S').dt.hour

    train_results = genetic_algorithm_leader_correction(df_train)
    train_results.to_excel("LC_results_train.xlsx", index=False)

    test_results = genetic_algorithm_leader_correction(df_test)
    test_results.to_excel("LC_results_test.xlsx", index=False)

    print("✅ خروجی‌ها ساخته شدند و ذخیره شدند.")
    print("فایل‌ها آماده: LC_results_train.xlsx و LC_results_test.xlsx")


✅ خروجی‌ها ساخته شدند و ذخیره شدند.
فایل‌ها آماده: LC_results_train.xlsx و LC_results_test.xlsx


In [8]:
import os
print(os.listdir('/content'))

['.config', 'LC_results_train.xlsx', 'LC_results_test.xlsx', 'USDCHF_H4.csv', 'drive', 'EURUSD_H4.csv', 'sample_data']


In [9]:
from google.colab import files
files.download('LC_results_train.xlsx')
files.download('LC_results_test.xlsx')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [12]:
import pandas as pd
import numpy as np

# ===== بک‌تست واقعی =====
def backtest_model(df, params, model_name):
    initial_capital = 10000
    lot_size = 0.1
    stop_loss_pips = float(params.get('SL', 90))
    take_profit_pips = float(params.get('TP', 130))

    equity = initial_capital
    peak_equity = initial_capital
    trades = []

    for i in range(1, len(df)):
        signal = None
        # نمونه ساده سیگنال؛ در پایان با منطق واقعی مدل جایگزین شود
        if float(df['Close'].iloc[i]) > float(df['Close'].iloc[i-1]):
            signal = 'BUY'
        elif float(df['Close'].iloc[i]) < float(df['Close'].iloc[i-1]):
            signal = 'SELL'

        if signal:
            entry_price = float(df['Close'].iloc[i])
            if signal == 'BUY':
                exit_price = entry_price + (take_profit_pips * 0.0001) \
                             if np.random.rand() > 0.5 else entry_price - (stop_loss_pips * 0.0001)
            else:
                exit_price = entry_price - (take_profit_pips * 0.0001) \
                             if np.random.rand() > 0.5 else entry_price + (stop_loss_pips * 0.0001)

            profit = (exit_price - entry_price) * (100000 * lot_size) if signal == 'BUY' \
                     else (entry_price - exit_price) * (100000 * lot_size)
            equity += profit
            trades.append(profit)
            peak_equity = max(peak_equity, equity)

    net_profit = equity - initial_capital
    if trades:
        if any(t < 0 for t in trades):
            profit_risk = np.mean([t for t in trades if t > 0]) / abs(np.mean([t for t in trades if t < 0]))
        else:
            profit_risk = np.inf
    else:
        profit_risk = 0
    drawdown = (peak_equity - equity) / peak_equity * 100

    return net_profit, profit_risk, drawdown

# ===== GA ساده =====
def ga_optimize(df, model_name, param_bounds):
    best_params = None
    best_score = -np.inf
    for _ in range(30):  # جمعیت نمونه
        candidate = {k: float(np.random.uniform(low, high)) for k, (low, high) in param_bounds.items()}
        npf, prr, dd = backtest_model(df, candidate, model_name)
        if npf > best_score:
            best_params = candidate
            best_score = npf
    return best_params

# ===== Leader Correction =====
def leader_correction(train_df, test_df, models, bounds, window_size=200):
    results_train = []
    results_test = []

    for start in range(0, len(train_df)-window_size, window_size):
        window = train_df.iloc[start:start+window_size].reset_index(drop=True)
        winners = {}
        for model in models:
            params = ga_optimize(window, model, bounds[model])
            npf, prr, dd = backtest_model(window, params, model)
            winners[model] = (npf, prr, dd, params)

        leader = max(winners, key=lambda m: winners[m][0])
        best_npf, best_prr, best_dd, best_params = winners[leader]

        # قبل از بهینه‌سازی
        base_npf, base_prr, base_dd = backtest_model(window, {k: float(v[0]) for k,v in bounds[leader].items()}, leader)

        results_train.append({
            'Window_Start': start,
            'Leader_Model': leader,
            **best_params,
            'Net_Profit': best_npf,
            'Profit/Risk': best_prr,
            'Drawdown': best_dd,
            '%Improvement_NP': ((best_npf - base_npf) / abs(base_npf) * 100) if base_npf != 0 else np.nan,
            '%Improvement_PR': ((best_prr - base_prr) / abs(base_prr) * 100) if base_prr != 0 else np.nan,
            '%Improvement_DD': ((base_dd - best_dd) / abs(base_dd) * 100) if base_dd != 0 else np.nan
        })

    # تست فقط با پارامترهای بهترین رهبر آموزش
    for res in results_train:
        npf, prr, dd = backtest_model(test_df, {k: float(res[k]) for k in bounds[res['Leader_Model']]}, res['Leader_Model'])
        results_test.append({
            **res,
            'Net_Profit': npf,
            'Profit/Risk': prr,
            'Drawdown': dd
        })

    return pd.DataFrame(results_train), pd.DataFrame(results_test)

# ===== اجرای اصلی =====
if __name__ == "__main__":
    train_df = pd.read_csv("USDCHF_H4.csv")
    test_df = pd.read_csv("USDCHF_H4_TesT.csv")

    models = ['PPPL', 'dSMA']
    bounds = {
        'PPPL': {'n_SMA': (14, 20), 'd11': (0.1, 0.3), 'SL': (80, 120), 'TP': (120, 150)},
        'dSMA': {'n_SMA': (10, 18), 'd11': (0.05, 0.2), 'SL': (70, 110), 'TP': (110, 140)}
    }

    res_train, res_test = leader_correction(train_df, test_df, models, bounds, window_size=200)

    res_train.to_excel("LC_results_train.xlsx", index=False)
    res_test.to_excel("LC_results_test.xlsx", index=False)

    print("✅ Files saved: LC_results_train.xlsx, LC_results_test.xlsx")


✅ Files saved: LC_results_train.xlsx, LC_results_test.xlsx


In [14]:
from google.colab import files
files.download('LC_results_train.xlsx')
files.download('LC_results_test.xlsx')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [15]:
import pandas as pd
import numpy as np

# ===== بک‌تست مشابه GA (لطفاً منطق واقعی را جایگزین کنید) =====
def backtest_model(df, params, model_name):
    initial_capital = 10000
    lot_size = 0.1
    stop_loss_pips = float(params.get('SL', 90))
    take_profit_pips = float(params.get('TP', 130))

    equity = initial_capital
    peak_equity = initial_capital
    trades = []

    for i in range(1, len(df)):
        signal = None
        # نمونه ساده سیگنال؛ منطق واقعی مدل را جایگزین کنید
        if float(df['Close'].iloc[i]) > float(df['Close'].iloc[i-1]):
            signal = 'BUY'
        elif float(df['Close'].iloc[i]) < float(df['Close'].iloc[i-1]):
            signal = 'SELL'

        if signal:
            entry_price = float(df['Close'].iloc[i])
            if signal == 'BUY':
                exit_price = entry_price + (take_profit_pips * 0.0001) \
                             if np.random.rand() > 0.5 else entry_price - (stop_loss_pips * 0.0001)
            else:
                exit_price = entry_price - (take_profit_pips * 0.0001) \
                             if np.random.rand() > 0.5 else entry_price + (stop_loss_pips * 0.0001)

            profit = (exit_price - entry_price) * (100000 * lot_size) if signal == 'BUY' \
                     else (entry_price - exit_price) * (100000 * lot_size)
            equity += profit
            trades.append(profit)
            peak_equity = max(peak_equity, equity)

    net_profit = equity - initial_capital
    if trades:
        if any(t < 0 for t in trades):
            profit_risk = np.mean([t for t in trades if t > 0]) / abs(np.mean([t for t in trades if t < 0]))
        else:
            profit_risk = np.inf
    else:
        profit_risk = 0
    drawdown = (peak_equity - equity) / peak_equity * 100

    return net_profit, profit_risk, drawdown

# ===== PSO ساده برای بهینه‌سازی پارامترها =====
def pso_optimize(df, model_name, param_bounds, pop_size=30, n_gen=50, w=0.7, c1=1.5, c2=1.5):
    dim = len(param_bounds)
    lb = np.array([v[0] for v in param_bounds.values()])
    ub = np.array([v[1] for v in param_bounds.values()])
    keys = list(param_bounds.keys())

    # مقداردهی اولیه ذرات و سرعت‌ها
    positions = np.random.uniform(low=lb, high=ub, size=(pop_size, dim))
    vmax = (ub - lb) * 0.2
    velocities = np.random.uniform(low=-vmax, high=vmax, size=(pop_size, dim))

    pbest_positions = positions.copy()
    pbest_scores = np.array([-np.inf] * pop_size)

    # ارزیابی اولیه ذرات
    for i in range(pop_size):
        params = {keys[j]: positions[i, j] for j in range(dim)}
        npf, _, _ = backtest_model(df, params, model_name)
        pbest_scores[i] = npf

    gbest_index = np.argmax(pbest_scores)
    gbest_position = pbest_positions[gbest_index].copy()
    gbest_score = pbest_scores[gbest_index]

    for gen in range(n_gen):
        for i in range(pop_size):
            r1, r2 = np.random.rand(dim), np.random.rand(dim)
            velocities[i] = (w * velocities[i] +
                             c1 * r1 * (pbest_positions[i] - positions[i]) +
                             c2 * r2 * (gbest_position - positions[i]))
            velocities[i] = np.clip(velocities[i], -vmax, vmax)

            positions[i] = positions[i] + velocities[i]
            positions[i] = np.clip(positions[i], lb, ub)

            params = {keys[j]: positions[i, j] for j in range(dim)}
            npf, _, _ = backtest_model(df, params, model_name)

            if npf > pbest_scores[i]:
                pbest_scores[i] = npf
                pbest_positions[i] = positions[i].copy()

                if npf > gbest_score:
                    gbest_score = npf
                    gbest_position = positions[i].copy()
        print(f"نسل {gen+1}/{n_gen} بهترین سود خالص: {gbest_score:.2f}")

    best_params = {keys[j]: gbest_position[j] for j in range(dim)}
    return best_params, gbest_score

# ===== Leader Correction با استفاده از PSO =====
def leader_correction_pso(train_df, test_df, models, bounds, window_size=200):
    results_train = []
    results_test = []

    for start in range(0, len(train_df) - window_size, window_size):
        window = train_df.iloc[start:start + window_size].reset_index(drop=True)
        winners = {}
        for model in models:
            params, score = pso_optimize(window, model, bounds[model])
            npf, prr, dd = backtest_model(window, params, model)
            winners[model] = (npf, prr, dd, params)

        leader = max(winners, key=lambda m: winners[m][0])
        best_npf, best_prr, best_dd, best_params = winners[leader]

        # محاسبه عملکرد پایه با پارامترهای حد پایین (یا میانگین) برای مقایسه
        base_params = {k: float(v[0]) for k, v in bounds[leader].items()}
        base_npf, base_prr, base_dd = backtest_model(window, base_params, leader)

        results_train.append({
            'Window_Start': start,
            'Leader_Model': leader,
            **best_params,
            'Net_Profit': best_npf,
            'Profit/Risk': best_prr,
            'Drawdown': best_dd,
            '%Improvement_NP': ((best_npf - base_npf) / abs(base_npf) * 100) if base_npf != 0 else np.nan,
            '%Improvement_PR': ((best_prr - base_prr) / abs(base_prr) * 100) if base_prr != 0 else np.nan,
            '%Improvement_DD': ((base_dd - best_dd) / abs(base_dd) * 100) if base_dd != 0 else np.nan
        })

    # تست بر اساس پارامترهای رهبر به دست آمده در هر پنجره
    for res in results_train:
        npf, prr, dd = backtest_model(test_df, {k: float(res[k]) for k in bounds[res['Leader_Model']]}, res['Leader_Model'])
        results_test.append({
            **res,
            'Net_Profit': npf,
            'Profit/Risk': prr,
            'Drawdown': dd
        })

    return pd.DataFrame(results_train), pd.DataFrame(results_test)


# ===== اجرای اصلی =====
if __name__ == "__main__":
    train_df = pd.read_csv("USDCHF_H4.csv")
    test_df = pd.read_csv("USDCHF_H4_TesT.csv")

    models = ['PPPL', 'dSMA']
    bounds = {
        'PPPL': {'n_SMA': (14, 20), 'd11': (0.1, 0.3), 'SL': (80, 120), 'TP': (120, 150)},
        'dSMA': {'n_SMA': (10, 18), 'd11': (0.05, 0.2), 'SL': (70, 110), 'TP': (110, 140)}
    }

    res_train, res_test = leader_correction_pso(train_df, test_df, models, bounds, window_size=200)

    res_train.to_excel("LC_results_train_PSO.xlsx", index=False)
    res_test.to_excel("LC_results_test_PSO.xlsx", index=False)

    print("✅ فایل‌ها ذخیره شدند: LC_results_train_PSO.xlsx, LC_results_test_PSO.xlsx")


نسل 1/50 بهترین سود خالص: 9032.70
نسل 2/50 بهترین سود خالص: 9032.70
نسل 3/50 بهترین سود خالص: 9032.70
نسل 4/50 بهترین سود خالص: 9032.70
نسل 5/50 بهترین سود خالص: 9413.98
نسل 6/50 بهترین سود خالص: 9413.98
نسل 7/50 بهترین سود خالص: 9413.98
نسل 8/50 بهترین سود خالص: 11437.41
نسل 9/50 بهترین سود خالص: 11437.41
نسل 10/50 بهترین سود خالص: 11437.41
نسل 11/50 بهترین سود خالص: 11437.41
نسل 12/50 بهترین سود خالص: 11437.41
نسل 13/50 بهترین سود خالص: 11437.41
نسل 14/50 بهترین سود خالص: 11437.41
نسل 15/50 بهترین سود خالص: 11437.41
نسل 16/50 بهترین سود خالص: 11437.41
نسل 17/50 بهترین سود خالص: 11437.41
نسل 18/50 بهترین سود خالص: 11437.41
نسل 19/50 بهترین سود خالص: 11437.41
نسل 20/50 بهترین سود خالص: 11437.41
نسل 21/50 بهترین سود خالص: 11437.41
نسل 22/50 بهترین سود خالص: 11437.41
نسل 23/50 بهترین سود خالص: 11437.41
نسل 24/50 بهترین سود خالص: 11437.41
نسل 25/50 بهترین سود خالص: 11437.41
نسل 26/50 بهترین سود خالص: 11437.41
نسل 27/50 بهترین سود خالص: 11437.41
نسل 28/50 بهترین سود خالص: 11437.41
نسل 29/5

In [16]:
from google.colab import files
files.download('LC_results_train.xlsx')
files.download('LC_results_test.xlsx')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>