# **Notebook Overview: Rule-Based Trading with Metaheuristic Rule Discovery**

This notebook presents a complete demonstration of a **rule-based trading system** applied to Ethereum price data. The objective is to illustrate how **Genetic Algorithms (GA)** can be used to *automatically discover interpretable IF–THEN trading rules* based on engineered features from **Phase 1 of the course** .

The implementation follows the conceptual framework introduced in the accompanying lecture materials on **Rule Discovery with Metaheuristics** .

---

## **Purpose of the Notebook**

The notebook serves as a educational example demonstrating:

1. **How trading rules are encoded as chromosomes** within a GA.
2. **How conditions, actions, TP/SL levels, and position sizing are represented genetically.**
3. **How a rule list is decoded and executed via a backtesting engine.**
4. **How the GA iteratively improves candidate rule sets** using fitness feedback from historical performance.

Although the demonstration uses only a subset of features, students may extend the system to incorporate the full set of engineered indicators produced in Phase 1.

---

## **Chromosome Structure and Representation**

Each chromosome represents a **complete ordered rule list**, where:

* Each rule consists of multiple **conditions**, specifying a feature, operator, and threshold.
* Each rule also encodes its **action parameters**:

  * Take-profit (TP),
  * Stop-loss (SL),
  * **Fraction of capital allocated to the trade** (position size).
* Activation flags control whether a rule or condition is included in the effective strategy.

The genetic representation enables exploration of a very large combinational search space while retaining **transparent, human-readable rule structures** once decoded.

---

## **Training Data and Features**

For this demonstration, we use only a small set of the available features to keep the example focused and computationally manageable.
However, students are encouraged to:

* Use **all available engineered features**,
* Modify or extend the chromosome structure,
* Experiment with different operator sets, thresholds, or rule depths.

The system is fully modular, and feature selection is handled implicitly via the genetic encoding.

---

## **Optimization and Fitness Function**

The GA is trained exclusively on the **training dataset**.
The objective function (fitness) is defined as:

> **Final equity obtained after backtesting the rule set**, starting with an initial capital of $1000.

This formulation incorporates:

* Profitability of trades,
* Trading frequency,
* Position sizing decisions,
* Compounding through equity updates.

After training, students must evaluate their discovered rule sets on the **separate test dataset** to assess out-of-sample performance and detect overfitting.

---

## **Student Instructions**

* You may run this notebook directly in **Google Colab**, but remember to mount your Google Drive before accessing datasets.
* All configuration parameters (e.g., population size, mutation rates, TP/SL ranges, position-size bounds) can be modified to explore different design choices.
* The implementation is modular; students may:

  * Add new features,
  * Adjust how rules are represented,
  * Implement alternative fitness functions,
  * Replace GA with other metaheuristic algorithms such as PSO or DE.

---

## **Educational Goals**

By working through this notebook, you will gain hands-on understanding of:

* How rule-based trading systems are formalized and optimized,
* How metaheuristic methods operate on structured, interpretable solutions,
* How backtesting interacts with rule logic, signal generation, and position management,
* How to evaluate trading strategies rigorously using both train and test datasets.

This forms the foundation for the **Phase 2 Rule Discovery Project**.

In [1]:
# from google.colab import drive
# drive.mount('/content/drive')

In [2]:
import numpy as np
import json
import pandas as pd
from dataclasses import dataclass
from typing import List, Optional, Tuple
import random

In [3]:
# === CONFIG: rule structure, GA, and trading ===

MAX_RULES = 4         # N (max rules in the rule list)
MAX_CONDS = 2          # K (max conditions per rule)

TP_MIN, TP_MAX = 0.02, 0.05   # 2% .. 5%
SL_MIN, SL_MAX = 0.01, 0.04   # 1% .. 4%

STARTING_CAPITAL = 1000.0    # starting money for the strategy

POS_MIN_FRAC = 0.05          # 10% of capital per trade (min)
POS_MAX_FRAC = 0.30          # 50% of capital per trade (max)

POP_SIZE = 6
N_GENERATIONS = 40
TOURNAMENT_SIZE = 5
CROSSOVER_RATE = 0.85
MUTATION_RATE = 0.08   # base mutation probability per gene

RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

In [4]:
# === 1. Load ETH data with engineered features ===

def load_eth_features(csv_path: str,
                      feature_cols: List[str],
                      close_col: str = "close"
                      ) -> Tuple[pd.DataFrame, List[str]]:
    """
    Load ETH OHLCV + engineered features.
    We keep only 'close' and selected feature columns.

    Parameters
    ----------
    csv_path : str
        Path to CSV containing at least 'close' and feature columns.
    feature_cols : list of str
        Subset of ~50 features you want to use in the demo.
    close_col : str
        Name of the close price column.

    Returns
    -------
    df : pd.DataFrame
        DataFrame indexed by time with 'close' and selected features.
    feature_cols_used : list of str
        The actual feature columns we keep (intersection of requested + available).
    """
    df = pd.read_csv(csv_path, parse_dates=True, index_col=0)
    df.columns = [c.lower() for c in df.columns]

    if close_col.lower() not in df.columns:
        raise ValueError(f"Close column '{close_col}' not found in data.")

    # Intersect requested features with available columns
    feature_cols_lower = [c.lower() for c in feature_cols]
    available_features = [c for c in feature_cols_lower if c in df.columns]

    if len(available_features) == 0:
        raise ValueError("None of the requested feature columns are present in the data.")

    cols_to_keep = [close_col.lower()] + available_features
    df = df[cols_to_keep].sort_index()

    return df, available_features

In [5]:
# === 2. Gene and Rule structures (phenotype) ===

@dataclass
class Condition:
    # active: bool
    feature_idx: int
    operator: str      # "<" or ">"
    q: float    # ADD FOR QUANTILE
    threshold: float = None   # numeric threshold


@dataclass
class Rule:
    # active: bool
    conditions: List[Condition]
    side: str          # "BUY" or "SELL"
    tp: float          # take-profit as decimal (e.g. 0.03 for 3%)
    sl: float          # stop-loss as decimal
    size_frac: float   # fraction of current capital to allocate (0.0–1.0)

In [6]:
# === 2b. Chromosome representation (genotype) ===

@dataclass
class Chromosome:
    # Rule-level genes
    rule_active: np.ndarray      # (MAX_RULES,)  {0,1}
    side_gene: np.ndarray        # (MAX_RULES,)  {0,1}  0=BUY, 1=SELL
    tp_gene: np.ndarray          # (MAX_RULES,)  [0,1]
    sl_gene: np.ndarray          # (MAX_RULES,)  [0,1]
    size_gene: np.ndarray        # (MAX_RULES,)  [0,1]  --> position size fraction

    # Condition-level genes
    cond_active: np.ndarray      # (MAX_RULES, MAX_CONDS)  {0,1}
    feature_idx_gene: np.ndarray # (MAX_RULES, MAX_CONDS)
    operator_gene: np.ndarray    # (MAX_RULES, MAX_CONDS)  {0,1}
    # threshold_gene: np.ndarray   # (MAX_RULES, MAX_CONDS)  [0,1]
    q_gene: np.ndarray

In [7]:
# === 3. Mapping genes to actual TP/SL and thresholds ===

def map_tp_gene(tp_gene: float) -> float:
    """Map [0,1] gene to TP% in [TP_MIN, TP_MAX]."""
    return TP_MIN + tp_gene * (TP_MAX - TP_MIN)


def map_sl_gene(sl_gene: float) -> float:
    """Map [0,1] gene to SL% in [SL_MIN, SL_MAX]."""
    return SL_MIN + sl_gene * (SL_MAX - SL_MIN)


def map_size_gene(size_gene: float) -> float:
    """
    Map [0,1] size_gene to a fraction of capital to allocate per trade.
    For example, POS_MIN_FRAC=0.05, POS_MAX_FRAC=0.5 => 5%..50%.
    """
    return POS_MIN_FRAC + size_gene * (POS_MAX_FRAC - POS_MIN_FRAC)


def map_operator_gene(op_gene: int) -> str:
    """0 -> '<', 1 -> '>'."""
    return "<" if op_gene == 0 else ">"


def map_threshold_gene_to_value(feature_series: pd.Series, thr_gene: float) -> float:
    """
    Map a [0,1] threshold_gene to a numeric threshold using feature quantiles.

    thr_gene ~ 0.0 => low quantile (e.g. oversold RSI)
    thr_gene ~ 1.0 => high quantile (e.g. overbought RSI)
    """
    # np.nanquantile handles NaNs gracefully
    return float(np.nanquantile(feature_series.values, thr_gene))

In [8]:
def compute_condition_thresholds(
    rules,
    df_reference: pd.DataFrame,
    feature_cols
):
    """
    Compute real thresholds using quantiles from df_reference only.
    Prevents data leakage.
    """
    for r in rules:
        for cond in r.conditions:
            feat = feature_cols[cond.feature_idx]
            series = df_reference[feat].dropna()

            if len(series) == 0:
                cond.threshold = None
                continue

            q = min(max(cond.q, 0.01), 0.99)
            cond.threshold = float(series.quantile(q))


In [9]:
# === Decode Chromosome to Rule List (Quantile-based) ===

def decode_chromosome(
    chrom: Chromosome,
    df: pd.DataFrame,
    feature_cols: List[str]
) -> List[Rule]:
    """
    Decode chromosome into a list of Rule objects.
    IMPORTANT:
    - This function does NOT compute numeric thresholds.
    - It only assigns quantile genes (q).
    - Real thresholds are computed later using compute_condition_thresholds().
    """

    rules: List[Rule] = []

    for r in range(MAX_RULES):
        if chrom.rule_active[r] == 0:
            continue

        # --- rule-level genes ---
        side = "BUY" if chrom.side_gene[r] == 0 else "SELL"
        tp = map_tp_gene(chrom.tp_gene[r])
        sl = map_sl_gene(chrom.sl_gene[r])
        size_frac = map_size_gene(chrom.size_gene[r])

        conds: List[Condition] = []

        for c in range(MAX_CONDS):
            if chrom.cond_active[r, c] == 0:
                continue

            feat_idx = int(chrom.feature_idx_gene[r, c]) % len(feature_cols)
            op = map_operator_gene(int(chrom.operator_gene[r, c]))

            # ⭐ Quantile gene (NOT numeric threshold)
            q = float(chrom.q_gene[r, c])   # اگر اسم را عوض نکردی: threshold_gene

            conds.append(
                Condition(
                    feature_idx=feat_idx,
                    operator=op,
                    q=q,              # quantile ∈ (0,1)
                    threshold=None    # computed later
                )
            )

        # discard empty rules
        if len(conds) == 0:
            continue

        rules.append(
            Rule(
                conditions=conds,
                side=side,
                tp=tp,
                sl=sl,
                size_frac=size_frac,
            )
        )

    return rules

In [10]:
def rule_fires(
    rule: Rule,
    df: pd.DataFrame,
    feature_cols: List[str],
    t: int
) -> bool:
    """
    Check if a rule fires at row index t.
    All conditions must be true.

    Quantile-based note:
    - cond.threshold MUST be pre-computed (e.g., by compute_condition_thresholds).
    - If threshold is None, rule does not fire (prevents leakage / undefined behavior).
    """
    row = df.iloc[t]

    for cond in rule.conditions:
        # threshold must exist
        thr = getattr(cond, "threshold", None)
        if thr is None or np.isnan(thr):
            return False

        feat_name = feature_cols[cond.feature_idx]
        x = row[feat_name]

        # missing feature => don't fire
        if x is None or (isinstance(x, float) and np.isnan(x)):
            return False

        op = cond.operator
        if op == "<":
            if not (x < thr):
                return False
        elif op == ">":
            if not (x > thr):
                return False
        else:
            # unknown operator => safe fail
            return False

    return True

In [11]:
# def backtest_rule_list(
#     rules: List[Rule],
#     df: pd.DataFrame,
#     feature_cols: List[str],
#     starting_capital: float = STARTING_CAPITAL
# ) -> Tuple[List[float], float]:
#     """
#     Backtest a rule list with capital and position sizing.

#     Returns
#     -------
#     trade_returns : list of float
#         Per-trade returns (in % terms, like before).
#     final_equity : float
#         Final money after all trades.
#     """
#     if len(rules) == 0:
#         return [], starting_capital

#     close = df["close"].values
#     n = len(df)

#     equity = starting_capital

#     position = 0          # 0 = flat, +1 = long, -1 = short
#     entry_price = None
#     entry_rule: Optional[Rule] = None
#     entry_capital = None  # amount of capital allocated to this trade

#     trade_returns: List[float] = []

#     for t in range(n):
#         price = close[t]
#         if np.isnan(price):
#             continue

#         if position == 0:
#             # --- FLAT: look for entry ---
#             for rule in rules:
#                 if rule_fires(rule, df, feature_cols, t):
#                     # Capital to allocate = fraction of current equity
#                     size_frac = rule.size_frac
#                     trade_capital = equity * size_frac

#                     if trade_capital <= 0:
#                         break  # nothing to allocate

#                     position = 1 if rule.side == "BUY" else -1
#                     entry_price = price
#                     entry_rule = rule
#                     entry_capital = trade_capital
#                     break

#         else:
#             # --- IN POSITION: manage trade ---
#             assert entry_rule is not None and entry_price is not None and entry_capital is not None

#             # Return in direction of position (+ for profit)
#             ret = position * (price / entry_price - 1.0)

#             tp_hit = ret >= entry_rule.tp
#             sl_hit = ret <= -entry_rule.sl

#             if tp_hit or sl_hit:
#                 # Close trade
#                 pnl = ret * entry_capital      # profit or loss in dollars
#                 equity += pnl                  # update money
#                 trade_returns.append(ret)

#                 # Reset position
#                 position = 0
#                 entry_price = None
#                 entry_rule = None
#                 entry_capital = None

#     return trade_returns, equity

def backtest_rule_list(
    rules: List[Rule],
    df: pd.DataFrame,
    feature_cols: List[str],
    starting_capital: float = STARTING_CAPITAL
) -> Tuple[List[float], float, int]:
    """
    Backtest a rule list with capital and position sizing.

    Returns
    -------
    equity_curve : list of float
        True equity over time (incremental)
    final_equity : float
        Final money after all trades
    n_trades : int
        Number of closed trades
    """
    if len(rules) == 0:
        return [starting_capital], starting_capital, 0

    close = df["close"].values
    n = len(df)

    equity = float(starting_capital)
    equity_curve = [equity]

    position = 0          # 0 = flat, +1 = long, -1 = short
    entry_price = None
    entry_rule: Optional[Rule] = None
    entry_capital = None

    n_trades = 0

    for t in range(n):
        price = close[t]
        if np.isnan(price):
            equity_curve.append(equity)
            continue

        if position == 0:
            # --- FLAT: look for entry ---
            for rule in rules:
                if rule_fires(rule, df, feature_cols, t):
                    size_frac = rule.size_frac
                    trade_capital = equity * size_frac

                    if trade_capital <= 0:
                        break

                    position = 1 if rule.side == "BUY" else -1
                    entry_price = price
                    entry_rule = rule
                    entry_capital = trade_capital
                    break

        else:
            # --- IN POSITION: manage trade ---
            ret = position * (price / entry_price - 1.0)

            tp_hit = ret >= entry_rule.tp
            sl_hit = ret <= -entry_rule.sl

            if tp_hit or sl_hit:
                pnl = ret * entry_capital
                equity += pnl

                position = 0
                entry_price = None
                entry_rule = None
                entry_capital = None

                n_trades += 1

        equity_curve.append(equity)

    # --- force close at last price (important!) ---
    if position != 0 and entry_price is not None and entry_capital is not None:
        final_price = close[-1]
        if not np.isnan(final_price):
            ret = position * (final_price / entry_price - 1.0)
            pnl = ret * entry_capital
            equity += pnl
            equity_curve[-1] = equity
            n_trades += 1

    return equity_curve, equity, n_trades

In [12]:
# def compute_fitness(
#     chrom: Chromosome,
#     df: pd.DataFrame,
#     feature_cols: List[str]
# ) -> float:
#     """
#     Decode chromosome -> rule list -> backtest -> final equity.

#     Fitness = final money (higher is better).
#     """
#     rules = decode_chromosome(chrom, df, feature_cols)

#     trade_returns, final_equity = backtest_rule_list(
#         rules, df, feature_cols, starting_capital=STARTING_CAPITAL
#     )

#     # Optional: if you want to slightly penalize "do nothing" strategies:
#     if len(trade_returns) == 0:
#         return STARTING_CAPITAL - 1.0  # tiny penalty

#     return final_equity

def compute_fitness(
    chrom: Chromosome,
    df: pd.DataFrame,
    feature_cols: List[str]
) -> float:
    """
    Walk-forward + Quantile-based fitness.
    Robust against overfitting and scale drift.
    """

    # 1) Decode chromosome → rules (WITHOUT numeric thresholds)
    rules = decode_chromosome(chrom, df, feature_cols)

    # 2) Walk-forward split
    n = len(df)
    if n < 100:  # safety guard
        return -1e6

    split = int(0.7 * n)
    df_A = df.iloc[:split]
    df_B = df.iloc[split:]

    # 3) ⭐ Compute REAL thresholds from df_A ONLY (quantile-based)
    compute_condition_thresholds(rules, df_A, feature_cols)

    # 4) Backtest on A (train part)
    eq_A_curve, final_A, trades_A = backtest_rule_list(
        rules, df_A, feature_cols
    )

    # 5) Backtest on B (forward / pseudo-test)
    eq_B_curve, final_B, trades_B = backtest_rule_list(
        rules, df_B, feature_cols
    )

    # 6) Hard rejection (no-trade or degenerate strategies)
    if trades_A == 0 or trades_B == 0:
        return -1e6

    # 7) Drawdown on B only (future-facing risk)
    eq_B_curve = np.asarray(eq_B_curve, dtype=float)
    running_max = np.maximum.accumulate(eq_B_curve)
    drawdowns = (running_max - eq_B_curve) / np.clip(running_max, 1e-12, None)
    max_dd_B = float(np.max(drawdowns))

    # ---------------- penalties ----------------

    # (1) drawdown penalty (soft, realistic)
    LAMBDA_DD = 0.4
    penalty_dd = LAMBDA_DD * max_dd_B * STARTING_CAPITAL

    # (2) complexity penalty (rules + conditions)
    n_rules = len(rules)
    n_conds = sum(
        len(r.conditions)
        for r in rules
        if hasattr(r, "conditions") and r.conditions is not None
    )
    penalty_complexity = 1.2 * n_rules + 0.4 * n_conds

    # (3) trade-count pressure (soft, on B only)
    penalty_trades = 0.0
    if trades_B > 120:
        penalty_trades += 2.0 * (trades_B - 120)
    elif trades_B < 10:
        penalty_trades += 50.0

    # 8) Final fitness (future weighted more)
    fitness = (
        0.4 * final_A
        + 0.6 * final_B
        - penalty_dd
        - penalty_complexity
        - penalty_trades
    )

    return float(fitness)


In [13]:
# === 6. GA: initialization ===

def random_chromosome(n_features: int) -> Chromosome:
    rule_active = np.random.randint(0, 2, size=(MAX_RULES,))
    if rule_active.sum() == 0:
        rule_active[np.random.randint(0, MAX_RULES)] = 1

    side_gene = np.random.randint(0, 2, size=(MAX_RULES,))
    tp_gene = np.random.rand(MAX_RULES)
    sl_gene = np.random.rand(MAX_RULES)
    size_gene = np.random.rand(MAX_RULES)  # NEW: position size genes in [0,1]

    cond_active = np.random.randint(0, 2, size=(MAX_RULES, MAX_CONDS))
    for r in range(MAX_RULES):
        if rule_active[r] == 1 and cond_active[r].sum() == 0:
            cond_active[r, np.random.randint(0, MAX_CONDS)] = 1

    feature_idx_gene = np.random.randint(0, n_features, size=(MAX_RULES, MAX_CONDS))
    operator_gene = np.random.randint(0, 2, size=(MAX_RULES, MAX_CONDS))
    q_gene = np.random.rand(MAX_RULES, MAX_CONDS)

    return Chromosome(
        rule_active=rule_active,
        side_gene=side_gene,
        tp_gene=tp_gene,
        sl_gene=sl_gene,
        size_gene=size_gene,
        cond_active=cond_active,
        feature_idx_gene=feature_idx_gene,
        operator_gene=operator_gene,
        q_gene=q_gene,   # ⭐
    )

In [14]:
# === 6b. Parent selection (tournament) ===

def tournament_selection(population: List[Chromosome],
                         fitnesses: List[float],
                         k: int = TOURNAMENT_SIZE) -> Chromosome:
    """
    Tournament selection: pick k random individuals, return the best.
    """
    idxs = np.random.choice(len(population), size=k, replace=False)
    best_idx = idxs[0]
    best_fit = fitnesses[best_idx]
    for i in idxs[1:]:
        if fitnesses[i] > best_fit:
            best_fit = fitnesses[i]
            best_idx = i
    return population[best_idx]


In [15]:
def crossover(
    parent1: Chromosome,
    parent2: Chromosome
) -> Tuple[Chromosome, Chromosome]:
    """
    Rule-aware uniform crossover with gentle condition mixing.
    Quantile-aware (uses q_gene instead of threshold_gene).
    """

    # ---------- no crossover → clone ----------
    if np.random.rand() >= CROSSOVER_RATE:
        child1 = Chromosome(
            rule_active=parent1.rule_active.copy(),
            side_gene=parent1.side_gene.copy(),
            tp_gene=parent1.tp_gene.copy(),
            sl_gene=parent1.sl_gene.copy(),
            size_gene=parent1.size_gene.copy(),
            cond_active=parent1.cond_active.copy(),
            feature_idx_gene=parent1.feature_idx_gene.copy(),
            operator_gene=parent1.operator_gene.copy(),
            q_gene=parent1.q_gene.copy(),
        )
        child2 = Chromosome(
            rule_active=parent2.rule_active.copy(),
            side_gene=parent2.side_gene.copy(),
            tp_gene=parent2.tp_gene.copy(),
            sl_gene=parent2.sl_gene.copy(),
            size_gene=parent2.size_gene.copy(),
            cond_active=parent2.cond_active.copy(),
            feature_idx_gene=parent2.feature_idx_gene.copy(),
            operator_gene=parent2.operator_gene.copy(),
            q_gene=parent2.q_gene.copy(),
        )
        return child1, child2

    # ---------- create empty children ----------
    child1 = Chromosome(
        rule_active=np.zeros_like(parent1.rule_active),
        side_gene=np.zeros_like(parent1.side_gene),
        tp_gene=np.zeros_like(parent1.tp_gene),
        sl_gene=np.zeros_like(parent1.sl_gene),
        size_gene=np.zeros_like(parent1.size_gene),
        cond_active=np.zeros_like(parent1.cond_active),
        feature_idx_gene=np.zeros_like(parent1.feature_idx_gene),
        operator_gene=np.zeros_like(parent1.operator_gene),
        q_gene=np.zeros_like(parent1.q_gene),
    )

    child2 = Chromosome(
        rule_active=np.zeros_like(parent1.rule_active),
        side_gene=np.zeros_like(parent1.side_gene),
        tp_gene=np.zeros_like(parent1.tp_gene),
        sl_gene=np.zeros_like(parent1.sl_gene),
        size_gene=np.zeros_like(parent1.size_gene),
        cond_active=np.zeros_like(parent1.cond_active),
        feature_idx_gene=np.zeros_like(parent1.feature_idx_gene),
        operator_gene=np.zeros_like(parent1.operator_gene),
        q_gene=np.zeros_like(parent1.q_gene),
    )

    # ---------- rule-level uniform crossover ----------
    for r in range(MAX_RULES):
        if np.random.rand() < 0.5:
            src1, src2 = parent1, parent2
        else:
            src1, src2 = parent2, parent1

        # rule-level genes
        child1.rule_active[r] = src1.rule_active[r]
        child1.side_gene[r]   = src1.side_gene[r]
        child1.tp_gene[r]     = src1.tp_gene[r]
        child1.sl_gene[r]     = src1.sl_gene[r]
        child1.size_gene[r]   = src1.size_gene[r]

        child2.rule_active[r] = src2.rule_active[r]
        child2.side_gene[r]   = src2.side_gene[r]
        child2.tp_gene[r]     = src2.tp_gene[r]
        child2.sl_gene[r]     = src2.sl_gene[r]
        child2.size_gene[r]   = src2.size_gene[r]

        # ---------- condition-level gentle mixing ----------
        for c in range(MAX_CONDS):

            # ---- child1 ----
            if child1.rule_active[r] == 0:
                # inactive rule → copy directly
                child1.cond_active[r, c]      = src1.cond_active[r, c]
                child1.feature_idx_gene[r, c] = src1.feature_idx_gene[r, c]
                child1.operator_gene[r, c]    = src1.operator_gene[r, c]
                child1.q_gene[r, c]           = src1.q_gene[r, c]
            else:
                donor = src1 if np.random.rand() < 0.7 else src2
                child1.cond_active[r, c]      = donor.cond_active[r, c]
                child1.feature_idx_gene[r, c] = donor.feature_idx_gene[r, c]
                child1.operator_gene[r, c]    = donor.operator_gene[r, c]
                child1.q_gene[r, c]           = donor.q_gene[r, c]

            # ---- child2 ----
            if child2.rule_active[r] == 0:
                child2.cond_active[r, c]      = src2.cond_active[r, c]
                child2.feature_idx_gene[r, c] = src2.feature_idx_gene[r, c]
                child2.operator_gene[r, c]    = src2.operator_gene[r, c]
                child2.q_gene[r, c]           = src2.q_gene[r, c]
            else:
                donor = src2 if np.random.rand() < 0.7 else src1
                child2.cond_active[r, c]      = donor.cond_active[r, c]
                child2.feature_idx_gene[r, c] = donor.feature_idx_gene[r, c]
                child2.operator_gene[r, c]    = donor.operator_gene[r, c]
                child2.q_gene[r, c]           = donor.q_gene[r, c]

    return child1, child2

In [16]:
def mutate(chrom: Chromosome, n_features: int):
    """
    Quantile-aware, rule-aware mutation.
    """

    for r in range(MAX_RULES):

        # ---- mutate rule-level genes ----
        if np.random.rand() < MUTATION_RATE:
            chrom.rule_active[r] = 1 - chrom.rule_active[r]

        if np.random.rand() < MUTATION_RATE:
            chrom.side_gene[r] = 1 - chrom.side_gene[r]

        if np.random.rand() < MUTATION_RATE:
            chrom.tp_gene[r] = np.clip(
                chrom.tp_gene[r] + np.random.normal(scale=0.05),
                0.001,
                0.10,
            )

        if np.random.rand() < MUTATION_RATE:
            chrom.sl_gene[r] = np.clip(
                chrom.sl_gene[r] + np.random.normal(scale=0.05),
                0.001,
                0.10,
            )

        if np.random.rand() < MUTATION_RATE:
            chrom.size_gene[r] = np.clip(
                chrom.size_gene[r] + np.random.normal(scale=0.05),
                0.05,
                0.50,
            )

        # ---- mutate condition-level genes ----
        for c in range(MAX_CONDS):

            # only mutate conditions of active rules
            if chrom.rule_active[r] == 0:
                continue

            if np.random.rand() < MUTATION_RATE:
                chrom.cond_active[r, c] = 1 - chrom.cond_active[r, c]

            if np.random.rand() < MUTATION_RATE:
                chrom.feature_idx_gene[r, c] = np.random.randint(0, n_features)

            if np.random.rand() < MUTATION_RATE:
                chrom.operator_gene[r, c] = 1 - chrom.operator_gene[r, c]

            # ⭐ Quantile mutation (replaces threshold_gene)
            if np.random.rand() < MUTATION_RATE:
                chrom.q_gene[r, c] = np.clip(
                    chrom.q_gene[r, c] + np.random.normal(scale=0.08),
                    0.05,
                    0.95,
                )

In [17]:
# === 7. GA main loop ===

def run_ga(df: pd.DataFrame,
           feature_cols: List[str]
           ) -> Tuple[Chromosome, float]:
    """
    Run a simple GA to discover a good rule list.

    Returns best_chromosome, best_fitness.
    """
    n_features = len(feature_cols)

    # --- Initialize population ---


    initial_pop_size = POP_SIZE * 5
    population: List[Chromosome] = [
        random_chromosome(n_features) for _ in range(initial_pop_size)
    ]

    # Evaluate initial population
    fitnesses = [
        compute_fitness(chrom, df, feature_cols) for chrom in population
    ]

    sorted_idx = np.argsort(fitnesses)[::-1]  # descending
    population = [population[i] for i in sorted_idx[:POP_SIZE]]
    fitnesses = [fitnesses[i] for i in sorted_idx[:POP_SIZE]]


    best_idx = int(np.argmax(fitnesses))
    best_chrom = population[best_idx]
    best_fit = fitnesses[best_idx]

    print(f"Initial best fitness (from expanded pool): {best_fit:.6f}")


    for gen in range(1, N_GENERATIONS + 1):
        new_population: List[Chromosome] = []

        # --- Elitism: keep the best individual ---
        new_population.append(best_chrom)

        # --- Generate rest of population ---
        while len(new_population) < POP_SIZE:
            # Select parents
            p1 = tournament_selection(population, fitnesses)
            p2 = tournament_selection(population, fitnesses)

            # Crossover
            child1, child2 = crossover(p1, p2)

            # Mutation
            mutate(child1, n_features)
            mutate(child2, n_features)

            new_population.append(child1)
            if len(new_population) < POP_SIZE:
                new_population.append(child2)

        population = new_population
        fitnesses = [
            compute_fitness(chrom, df, feature_cols) for chrom in population
        ]

        gen_best_idx = int(np.argmax(fitnesses))
        gen_best_fit = fitnesses[gen_best_idx]

        # Update global best
        if gen_best_fit > best_fit:
            best_fit = gen_best_fit
            best_chrom = population[gen_best_idx]

        print(f"Generation {gen:3d}: best fitness = {gen_best_fit:.6f}, global best = {best_fit:.6f}")

    return best_chrom, best_fit


In [18]:
def pretty_print_rules(rules, feature_cols):
    for i, rule in enumerate(rules, 1):
        cond_strs = []
        for cond in rule.conditions:
            feat_name = feature_cols[cond.feature_idx]

            if cond.threshold is None:
                cond_strs.append(
                    f"{feat_name} {cond.operator} q={cond.q:.2f}"
                )
            else:
                cond_strs.append(
                    f"{feat_name} {cond.operator} {cond.threshold:.4f}"
                )

        cond_part = " AND ".join(cond_strs)
        print(f"Rule {i}: IF {cond_part}")
        print(
            f"    THEN {rule.side} "
            f"TP={rule.tp:.3f} SL={rule.sl:.3f} SIZE={rule.size_frac:.2f}"
        )

# Main program

In [19]:
# Example usage (you adapt the paths and feature names):
FEATURE_COLS = [
    "vol_var_20",
    "cand_close_open_ratio_1",
    "rob_median_abs_dev_30",
    "trend_ema_12",
    "mom_roc_10",
    "vol_cmf_20",
    "rob_iqr_20",
    "trend_ema_26",
    "vol_mfi_14",
    "rob_kurt_30",
    "trend_sma_20",
    "rob_hurst_100",
    "vol_pvo_12_26",
    "vol_vpt_1",
    "vol_std_20",
    "cand_shadow_lower_1",
    "trend_tema_20",
    "trend_hma_21",
    "cand_range_1",
    "vol_zclose_60",
    "mom_willr_14",
    "mom_macd_12_26",
    "mom_stoch_d_14_3_3",
    "cand_up_down_vol_ratio_20",
    "rob_autocorr_20",
    "ent_return_30",
    "vol_bbw_20_2",
    "vol_logret_std_20",
    "vol_range_ratio_14",
    "cand_shadow_upper_1",
    "mom_ppo_12_26",
    "trend_wma_14",
    "vol_high_low_corr_20",
    "vol_vroc_10",
    "mom_rsi_14"
]

DoNotUse_FEATURE_COLS = [
    "pivot_dynamic_reversal_score_20",
    "trend_tl_confluence",
    "trend_trendlines_bear_cross_5_10",
    "relative_strength_pair_20",
    "calmar_ratio_50",
    "beta_to_index_50",
    "corr_with_index_50",
    "inter_corr_eth_50",
    "fear_greed_index_5m",
    "inter_sp500_corr_50",
    "inter_market_ratio_gold",
    "sentiment_smooth_index",
]

In [20]:
# === Data Preprocessing Function ===

def preprocess_trading_data(
    df: pd.DataFrame,
    feature_cols: List[str],
    is_train: bool = True,
    scaler_params: Optional[dict] = None
) -> Tuple[pd.DataFrame, dict]:
    """
    Comprehensive preprocessing for trading data with features.
    
    Parameters
    ----------
    df : pd.DataFrame
        DataFrame with 'close' and feature columns.
    feature_cols : list of str
        List of feature column names to preprocess.
    is_train : bool
        If True, compute statistics from data. If False, use provided scaler_params.
    scaler_params : dict, optional
        Dictionary containing scaling parameters (mean, std, min, max, etc.) from training set.
        Required when is_train=False.
    
    Returns
    -------
    df_processed : pd.DataFrame
        Preprocessed DataFrame.
    scaler_params : dict
        Dictionary with scaling parameters for each feature (for use on test set).
    """
    df_processed = df.copy()
    
    if scaler_params is None:
        scaler_params = {}
    
    # 1. Handle infinite values (replace with NaN for proper handling)
    print(f"Processing {'TRAIN' if is_train else 'TEST'} data...")
    for col in feature_cols:
        if col in df_processed.columns:
            # Replace inf/-inf with NaN
            df_processed[col] = df_processed[col].replace([np.inf, -np.inf], np.nan)
    
    # 2. Handle missing values
    # Strategy: Forward fill first (use previous valid value), then backward fill, then fill remaining with median
    for col in feature_cols:
        if col in df_processed.columns:
            n_missing_before = df_processed[col].isna().sum()
            
            if n_missing_before > 0:
                # Forward fill (use last valid observation)
                df_processed[col] = df_processed[col].ffill()
                
                # Backward fill for any remaining NaNs at the start
                df_processed[col] = df_processed[col].bfill()
                
                # If still NaNs exist, fill with median (computed from train or stored)
                if df_processed[col].isna().any():
                    if is_train:
                        median_val = df_processed[col].median()
                        scaler_params[f"{col}_median"] = median_val
                    else:
                        median_val = scaler_params.get(f"{col}_median", 0.0)
                    
                    df_processed[col] = df_processed[col].fillna(median_val)
                
                n_missing_after = df_processed[col].isna().sum()
                if n_missing_before > 0:
                    print(f"  {col}: filled {n_missing_before} missing values -> {n_missing_after} remaining")
    
    # 3. Remove outliers (clip to reasonable ranges based on percentiles)
    # This prevents extreme values from distorting the strategy
    for col in feature_cols:
        if col in df_processed.columns:
            if is_train:
                # Compute 1st and 99th percentiles
                lower_bound = df_processed[col].quantile(0.01)
                upper_bound = df_processed[col].quantile(0.99)
                scaler_params[f"{col}_lower"] = lower_bound
                scaler_params[f"{col}_upper"] = upper_bound
            else:
                lower_bound = scaler_params.get(f"{col}_lower", df_processed[col].min())
                upper_bound = scaler_params.get(f"{col}_upper", df_processed[col].max())
            
            # Clip values
            df_processed[col] = df_processed[col].clip(lower=lower_bound, upper=upper_bound)
    
    # 4. Standardization (Z-score normalization)
    # This ensures all features have similar scales for threshold mapping
    for col in feature_cols:
        if col in df_processed.columns:
            if is_train:
                # Compute mean and std from training data
                mean_val = df_processed[col].mean()
                std_val = df_processed[col].std()
                
                # Avoid division by zero
                if std_val == 0 or np.isnan(std_val):
                    std_val = 1.0
                
                scaler_params[f"{col}_mean"] = mean_val
                scaler_params[f"{col}_std"] = std_val
            else:
                # Use stored parameters from training
                mean_val = scaler_params.get(f"{col}_mean", 0.0)
                std_val = scaler_params.get(f"{col}_std", 1.0)
            
            # Apply standardization
            df_processed[col] = (df_processed[col] - mean_val) / std_val
    
    # 5. Final check: ensure no NaNs or infs remain
    for col in feature_cols:
        if col in df_processed.columns:
            n_invalid = df_processed[col].isna().sum() + np.isinf(df_processed[col]).sum()
            if n_invalid > 0:
                print(f"  WARNING: {col} still has {n_invalid} invalid values after preprocessing!")
                # Final fallback: fill with 0
                df_processed[col] = df_processed[col].replace([np.inf, -np.inf], 0).fillna(0)
    
    print(f"Preprocessing complete. Shape: {df_processed.shape}")
    print(f"Date range: {df_processed.index[0]} to {df_processed.index[-1]}")
    print(f"Number of features: {len(feature_cols)}")
    
    return df_processed, scaler_params

In [21]:
# Load raw data
df, FEATURE_COLS = load_eth_features("./eth_5m_with_features.csv", list(set(FEATURE_COLS) - set(DoNotUse_FEATURE_COLS)))
df_test, FEATURE_COLS = load_eth_features("./eth_5m_with_features_test.csv", list(set(FEATURE_COLS) - set(DoNotUse_FEATURE_COLS)))

print(f"Raw train data shape: {df.shape}")
print(f"Raw test data shape: {df_test.shape}")

# Apply preprocessing
df, scaler_params = preprocess_trading_data(df, FEATURE_COLS, is_train=True)
df_test, _ = preprocess_trading_data(df_test, FEATURE_COLS, is_train=False, scaler_params=scaler_params)

print(f"\nFinal train data shape: {df.shape}")
print(f"Final test data shape: {df_test.shape}")

Raw train data shape: (52992, 36)
Raw test data shape: (26208, 36)
Processing TRAIN data...
  rob_median_abs_dev_30: filled 29 missing values -> 0 remaining
  vol_mfi_14: filled 15 missing values -> 0 remaining
  vol_logret_std_20: filled 20 missing values -> 0 remaining
  mom_rsi_14: filled 14 missing values -> 0 remaining
  vol_range_ratio_14: filled 13 missing values -> 0 remaining
  mom_stoch_d_14_3_3: filled 17 missing values -> 0 remaining
  trend_tema_20: filled 57 missing values -> 0 remaining
  trend_ema_26: filled 25 missing values -> 0 remaining
  mom_ppo_12_26: filled 25 missing values -> 0 remaining
  trend_hma_21: filled 23 missing values -> 0 remaining
  vol_high_low_corr_20: filled 19 missing values -> 0 remaining
  trend_wma_14: filled 13 missing values -> 0 remaining
  vol_bbw_20_2: filled 19 missing values -> 0 remaining
  trend_ema_12: filled 11 missing values -> 0 remaining
  vol_pvo_12_26: filled 25 missing values -> 0 remaining
  vol_vroc_10: filled 10 missing va

In [22]:
# 1) Run GA
best_chrom, best_fit = run_ga(df, FEATURE_COLS)
print(f"\nBest fitness found: {best_fit:.6f}")

# 2) Decode and show final rule list
best_rules = decode_chromosome(best_chrom, df, FEATURE_COLS)
compute_condition_thresholds(best_rules, df, FEATURE_COLS)

pretty_print_rules(best_rules, FEATURE_COLS)

# 3) Evaluate on TRAIN and TEST (3-month out-of-sample)
train_equity_curve, final_train_eq, n_train_trades = backtest_rule_list(
    best_rules, df, FEATURE_COLS
)

test_equity_curve, final_test_eq, n_test_trades = backtest_rule_list(
    best_rules, df_test, FEATURE_COLS
)

print(f"Train final equity: {final_train_eq:.2f}, Number of positions: {n_train_trades}")
print(f"Test  final equity: {final_test_eq:.2f}, Number of positions: {n_test_trades}")

# 4) Export best rules to JSON (schema-compliant)
rules_json = {"rules": []}

for rule in best_rules:
    rule_dict = {"conditions": [], "action": {}}

    for cond in rule.conditions:
        feat_name = FEATURE_COLS[cond.feature_idx]
        rule_dict["conditions"].append({
            "feature": feat_name,
            "op": cond.operator,
            "threshold": cond.threshold
        })

    rule_dict["action"] = {
        "side": rule.side,
        "tp": rule.tp,
        "sl": rule.sl,
        "size": rule.size_frac
    }

    rules_json["rules"].append(rule_dict)

with open("rules_G09.json", "w") as f:
    json.dump(rules_json, f, indent=4)

print("Best rules exported to rules_G09.json")

Initial best fitness (from expanded pool): 1089.695526
Generation   1: best fitness = 1121.059122, global best = 1121.059122
Generation   2: best fitness = 1121.059122, global best = 1121.059122
Generation   3: best fitness = 1121.059122, global best = 1121.059122
Generation   4: best fitness = 1121.059122, global best = 1121.059122
Generation   5: best fitness = 1121.059122, global best = 1121.059122
Generation   6: best fitness = 1121.059122, global best = 1121.059122
Generation   7: best fitness = 1124.291418, global best = 1124.291418
Generation   8: best fitness = 1133.863289, global best = 1133.863289
Generation   9: best fitness = 1133.863289, global best = 1133.863289
Generation  10: best fitness = 1133.863289, global best = 1133.863289
Generation  11: best fitness = 1133.863289, global best = 1133.863289
Generation  12: best fitness = 1133.863289, global best = 1133.863289
Generation  13: best fitness = 1133.863289, global best = 1133.863289
Generation  14: best fitness = 1133