<a href="https://colab.research.google.com/github/onlyforthesis/114-/blob/main/20241203%E5%9F%BA%E5%9B%A0%E6%BC%94%E7%AE%97%E6%B3%95.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install deap numpy yfinance

Collecting deap
  Downloading deap-1.4.3-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Downloading deap-1.4.3-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (135 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.6/135.6 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: deap
Successfully installed deap-1.4.3


In [2]:
import yfinance as yf
import numpy as np
import pandas as pd
from deap import base, creator, tools, algorithms
import random
import warnings
warnings.filterwarnings('ignore')

class GeneticTradingStrategy:
    def __init__(self, symbol, period="5y", population_size=50, generations=30):
        self.symbol = symbol
        self.period = period
        self.population_size = population_size
        self.generations = generations
        self.data = self._load_data()

        # 計算市場特性
        self.price_volatility = self._calculate_volatility()
        self.avg_volume = self._calculate_avg_volume()

        # 設定參數範圍
        self.M_RANGE = self._calculate_interval_range()
        self.H_TIME_RANGE = self._calculate_holding_range()
        self.TARGET_PROFIT_RANGE = self._calculate_profit_range()
        self.ALPHA_RANGE = self._calculate_alpha_range()

        # 初始化遺傳算法工具箱
        self.toolbox = self._setup_toolbox()

    def _load_data(self):
        """下載並準備股票數據"""
        ticker = yf.Ticker(self.symbol)
        data = ticker.history(period=self.period)

        # 計算技術指標
        data['SMA20'] = data['Close'].rolling(window=20).mean()
        data['SMA60'] = data['Close'].rolling(window=60).mean()
        data['RSI'] = self._calculate_rsi(data['Close'])
        data['MACD'], data['Signal'] = self._calculate_macd(data['Close'])

        return data

    def _calculate_rsi(self, prices, period=14):
        """計算RSI指標"""
        delta = prices.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
        rs = gain / loss
        return 100 - (100 / (1 + rs))

    def _calculate_macd(self, prices, fast=12, slow=26, signal=9):
        """計算MACD指標"""
        exp1 = prices.ewm(span=fast, adjust=False).mean()
        exp2 = prices.ewm(span=slow, adjust=False).mean()
        macd = exp1 - exp2
        signal_line = macd.ewm(span=signal, adjust=False).mean()
        return macd, signal_line

    def _calculate_volatility(self):
        """計算價格波動率"""
        returns = self.data['Close'].pct_change()
        return returns.std()

    def _calculate_avg_volume(self):
        """計算平均成交量"""
        return self.data['Volume'].mean()

    def _calculate_interval_range(self):
        """根據波動率調整區間數量範圍"""
        if self.price_volatility > 0.02:
            return (5, 15)
        elif self.price_volatility < 0.01:
            return (2, 7)
        return (3, 10)

    def _calculate_holding_range(self):
        """根據波動率調整持有期間範圍"""
        if self.price_volatility > 0.02:
            return (3, 15)
        elif self.price_volatility < 0.01:
            return (10, 40)
        return (5, 30)

    def _calculate_profit_range(self):
        """根據波動率調整目標獲利範圍"""
        base_profit = self.price_volatility * 5
        return (base_profit * 0.5, base_profit * 2)

    def _calculate_alpha_range(self):
        """根據成交量調整進場門檻範圍"""
        if self.avg_volume > 1000000:
            return (0.6, 0.9)
        return (0.5, 0.8)

    def _create_individual(self):
        """創建個體"""
        return [
            random.randint(*self.M_RANGE),            # m: 區間數量
            random.randint(*self.H_TIME_RANGE),       # h_time: 持有期間
            random.uniform(*self.TARGET_PROFIT_RANGE), # targetProfit: 目標獲利
            random.uniform(*self.ALPHA_RANGE)         # alpha: 進場門檻
        ]

    def _setup_toolbox(self):
        """設置遺傳算法工具箱"""
        creator.create("FitnessMax", base.Fitness, weights=(1.0,))
        creator.create("Individual", list, fitness=creator.FitnessMax)

        toolbox = base.Toolbox()
        toolbox.register("individual", tools.initIterate, creator.Individual, self._create_individual)
        toolbox.register("population", tools.initRepeat, list, toolbox.individual)

        toolbox.register("evaluate", self._evaluate_strategy)
        toolbox.register("mate", self._custom_crossover)
        toolbox.register("mutate", self._custom_mutation)
        toolbox.register("select", tools.selTournament, tournsize=3)

        return toolbox

    def _custom_crossover(self, ind1, ind2):
        """自定義交叉操作"""
        child1, child2 = tools.cxTwoPoint(list(ind1), list(ind2))

        # 確保基因值在有效範圍內
        child1[0] = int(np.clip(child1[0], *self.M_RANGE))
        child2[0] = int(np.clip(child2[0], *self.M_RANGE))

        child1[1] = int(np.clip(child1[1], *self.H_TIME_RANGE))
        child2[1] = int(np.clip(child2[1], *self.H_TIME_RANGE))

        child1[2] = np.clip(child1[2], *self.TARGET_PROFIT_RANGE)
        child2[2] = np.clip(child2[2], *self.TARGET_PROFIT_RANGE)

        child1[3] = np.clip(child1[3], *self.ALPHA_RANGE)
        child2[3] = np.clip(child2[3], *self.ALPHA_RANGE)

        return creator.Individual(child1), creator.Individual(child2)

    def _custom_mutation(self, individual):
        """自定義突變操作"""
        # 區間個數突變
        if random.random() < 0.25:
            individual[0] += random.randint(-1, 1)
            individual[0] = int(np.clip(individual[0], *self.M_RANGE))

        # 持有期間突變
        if random.random() < 0.25:
            individual[1] += random.randint(-2, 2)
            individual[1] = int(np.clip(individual[1], *self.H_TIME_RANGE))

        # 目標獲利突變
        if random.random() < 0.25:
            individual[2] += random.gauss(0, 0.01)
            individual[2] = np.clip(individual[2], *self.TARGET_PROFIT_RANGE)

        # 進場門檻突變
        if random.random() < 0.25:
            individual[3] += random.gauss(0, 0.05)
            individual[3] = np.clip(individual[3], *self.ALPHA_RANGE)

        return individual,

    def _simulate_trades(self, individual):
        """模擬交易並返回交易紀錄"""
        m, h_time, target_profit, alpha = individual
        trades = []

        # 計算價格區間
        price_range = self.data['High'].max() - self.data['Low'].min()
        interval_length = price_range / m
        min_price = self.data['Low'].min()
        intervals = [(min_price + i * interval_length, min_price + (i + 1) * interval_length)
                    for i in range(m)]

        # 模擬交易
        for i in range(60, len(self.data) - h_time):
            current_price = self.data['Close'].iloc[i]
            future_price = self.data['Close'].iloc[i + h_time]

            # 檢查每個價格區間
            for lower, upper in intervals:
                if lower <= current_price < upper:
                    potential_profit = (future_price - current_price) / current_price

                    # 檢查交易條件
                    if (self._check_basic_conditions(potential_profit, target_profit, alpha) and
                        self._check_technical_conditions(i) and
                        self._check_risk_conditions(i, current_price)):

                        trade = {
                            'entry_date': self.data.index[i],
                            'exit_date': self.data.index[i + h_time],
                            'entry_price': current_price,
                            'exit_price': future_price,
                            'profit': potential_profit
                        }
                        trades.append(trade)
                    break

        return trades

    def _check_basic_conditions(self, potential_profit, target_profit, alpha):
        """檢查基本交易條件"""
        return potential_profit > target_profit and potential_profit > alpha

    def _check_technical_conditions(self, i):
        """檢查技術指標條件"""
        # 趨勢確認
        trend_up = (self.data['SMA20'].iloc[i] > self.data['SMA60'].iloc[i] and
                   self.data['Close'].iloc[i] > self.data['SMA20'].iloc[i])

        # RSI條件
        rsi_good = 30 < self.data['RSI'].iloc[i] < 70

        # MACD條件
        macd_good = (self.data['MACD'].iloc[i] > self.data['Signal'].iloc[i] and
                    self.data['MACD'].iloc[i-1] <= self.data['Signal'].iloc[i-1])

        return trend_up and rsi_good and macd_good

    def _check_risk_conditions(self, i, current_price):
        """檢查風險管理條件"""
        # 計算波動率
        returns = self.data['Close'].iloc[i-20:i].pct_change()
        volatility = returns.std()

        # 檢查成交量
        volume_good = self.data['Volume'].iloc[i] > self.data['Volume'].iloc[i-20:i].mean()

        # 檢查價格位置
        price_position = (current_price - self.data['Low'].iloc[i-20:i].min()) / \
                        (self.data['High'].iloc[i-20:i].max() - self.data['Low'].iloc[i-20:i].min())

        return volatility < 0.02 and volume_good and 0.3 < price_position < 0.7

    def _calculate_max_drawdown(self, trades):
        """計算最大回撤"""
        if not trades:
            return 0

        equity_curve = pd.Series([t['profit'] for t in trades]).cumsum()
        rolling_max = equity_curve.expanding().max()
        drawdowns = (rolling_max - equity_curve) / rolling_max
        return drawdowns.max()

    def _calculate_stats(self, trades):
        """計算交易統計指標"""
        if not trades:
            return 0, 0, 0, 0

        profits = [t['profit'] for t in trades]

        # 計算總收益
        total_profit = sum(profits)

        # 計算最大回撤
        max_drawdown = self._calculate_max_drawdown(trades)

        # 計算勝率
        win_rate = len([p for p in profits if p > 0]) / len(profits)

        # 計算夏普比率
        returns = pd.Series(profits)
        sharpe_ratio = (returns.mean() - 0.004) / returns.std() if len(returns) > 1 else 0

        return total_profit, max_drawdown, win_rate, sharpe_ratio

    def _calculate_fitness(self, total_profit, n_trades, max_drawdown, win_rate, sharpe_ratio):
        """計算綜合適應度分數"""
        # 權重設定
        w_profit = 0.4
        w_trades = 0.1
        w_drawdown = 0.2
        w_winrate = 0.2
        w_sharpe = 0.1

        # 標準化各指標
        norm_profit = self._normalize(total_profit, 0, 0.5)
        norm_trades = self._normalize(n_trades, 10, 100)
        norm_drawdown = 1 - self._normalize(max_drawdown, 0, 0.2)
        norm_winrate = self._normalize(win_rate, 0.5, 1)
        norm_sharpe = self._normalize(sharpe_ratio, 1, 3)

        # 計算綜合分數
        fitness = (w_profit * norm_profit +
                  w_trades * norm_trades +
                  w_drawdown * norm_drawdown +
                  w_winrate * norm_winrate +
                  w_sharpe * norm_sharpe)

        return fitness

    def _normalize(self, value, min_val, max_val):
        """將指標標準化到0-1範圍"""
        return max(0, min(1, (value - min_val) / (max_val - min_val)))

    def _evaluate_strategy(self, individual):
        """評估策略"""
        trades = self._simulate_trades(individual)

        if not trades:
            return (0,)

        # 計算各項統計指標
        total_profit, max_drawdown, win_rate, sharpe_ratio = self._calculate_stats(trades)

        # 計算綜合適應度分數
        fitness = self._calculate_fitness(
            total_profit=total_profit,
            n_trades=len(trades),
            max_drawdown=max_drawdown,
            win_rate=win_rate,
            sharpe_ratio=sharpe_ratio
        )

        return (fitness,)

    def optimize(self):
        """執行遺傳算法優化"""
        pop = self.toolbox.population(n=self.population_size)
        hof = tools.HallOfFame(1)

        stats = tools.Statistics(lambda ind: ind.fitness.values)
        stats.register("avg", np.mean)
        stats.register("max", np.max)
        stats.register("min", np.min)
        stats.register("std", np.std)

        pop, logbook = algorithms.eaSimple(pop, self.toolbox,
                                         cxpb=0.7,  # 交叉概率
                                         mutpb=0.2,  # 突變概率
                                         ngen=self.generations,
                                         stats=stats,
                                         halloffame=hof,
                                         verbose=True)

        return hof[0], logbook

    def generate_trading_rules(self, best_individual):
        """根據最佳個體生成交易規則"""
        m, h_time, target_profit, alpha = best_individual

        # 計算價格區間
        price_range = self.data['High'].max() - self.data['Low'].min()
        interval_length = price_range / m
        min_price = self.data['Low'].min()
        intervals = [(min_price + i * interval_length, min_price + (i + 1) * interval_length)
                    for i in range(m)]

        trading_rules = []
        for i, (lower, upper) in enumerate(intervals):
            rule = {
                'interval': (lower, upper),
                'holding_period': h_time,
                'target_profit': target_profit,
                'alpha': alpha,
                'entry_conditions': [
                    f"股價在 {lower:.2f}-{upper:.2f} 區間內",
                    f"預期獲利率 > {target_profit:.2%}",
                    f"信心度 > {alpha:.2%}",
                    "20日均線向上穿越60日均線",
                    "RSI在30-70之間",
                    "MACD向上穿越信號線"
                ],
                'risk_management': [
                    f"止損點設在 {lower:.2f}",
                    "波動率 < 2%",
                    "成交量大於20日平均",
                    "價格位置在近期高低點30%-70%之間"
                ],
                'exit_rules': [
                    f"持有 {h_time} 天後賣出",
                    "或當價格跌破止損點時提前賣出"
                ]
            }
            trading_rules.append(rule)

        return trading_rules

    def backtest(self, individual):
        """回測策略表現"""
        trades = self._simulate_trades(individual)

        if not trades:
            return None

        # 轉換為DataFrame
        trades_df = pd.DataFrame(trades)

        # 計算統計指標
        stats = {
            '總交易次數': len(trades),
            '總收益率': sum(trades_df['profit']) * 100,
            '平均收益率': trades_df['profit'].mean() * 100,
            '勝率': len(trades_df[trades_df['profit'] > 0]) / len(trades) * 100,
            '最大回撤率': self._calculate_max_drawdown(trades) * 100,
            '夏普比率': (trades_df['profit'].mean() - 0.004) / trades_df['profit'].std() \
                if len(trades) > 1 else 0
        }

        return {
            'trades': trades_df,
            'stats': stats
        }

def main():
    # 設定股票代碼和參數
    symbol = "2330.TW"
    population_size = 50
    generations = 30

    # 創建策略優化器
    strategy = GeneticTradingStrategy(
        symbol=symbol,
        population_size=population_size,
        generations=generations
    )

    # 執行優化
    print("開始優化策略...")
    best_individual, logbook = strategy.optimize()

    # 生成交易規則
    trading_rules = strategy.generate_trading_rules(best_individual)

    # 回測最佳策略
    backtest_results = strategy.backtest(best_individual)

    # 打印結果
    print("\n最佳參數：")
    print(f"區間個數 (m): {best_individual[0]}")
    print(f"持有期間 (h_time): {best_individual[1]} 天")
    print(f"目標獲利 (targetProfit): {best_individual[2]:.2%}")
    print(f"進場門檻 (alpha): {best_individual[3]:.2%}")

    print("\n交易規則：")
    for i, rule in enumerate(trading_rules, 1):
        print(f"\n規則 {i}:")
        print(f"價格區間: {rule['interval'][0]:.2f} - {rule['interval'][1]:.2f}")
        print("進場條件:")
        for cond in rule['entry_conditions']:
            print(f"  - {cond}")
        print("風險管理:")
        for risk in rule['risk_management']:
            print(f"  - {risk}")
        print("退場規則:")
        for exit_rule in rule['exit_rules']:
            print(f"  - {exit_rule}")

    if backtest_results:
        print("\n回測結果：")
        for key, value in backtest_results['stats'].items():
            print(f"{key}: {value:.2f}")

if __name__ == "__main__":
    main()


開始優化策略...
gen	nevals	avg	max	min	std
0  	50    	0  	0  	0  	0  
1  	45    	0  	0  	0  	0  
2  	36    	0  	0  	0  	0  
3  	29    	0  	0  	0  	0  
4  	37    	0  	0  	0  	0  
5  	38    	0  	0  	0  	0  
6  	38    	0  	0  	0  	0  
7  	39    	0  	0  	0  	0  
8  	36    	0  	0  	0  	0  
9  	34    	0  	0  	0  	0  
10 	44    	0  	0  	0  	0  
11 	43    	0  	0  	0  	0  
12 	36    	0  	0  	0  	0  
13 	36    	0  	0  	0  	0  
14 	40    	0  	0  	0  	0  
15 	42    	0  	0  	0  	0  
16 	36    	0  	0  	0  	0  
17 	43    	0  	0  	0  	0  
18 	32    	0  	0  	0  	0  
19 	35    	0  	0  	0  	0  
20 	33    	0  	0  	0  	0  
21 	45    	0  	0  	0  	0  
22 	43    	0  	0  	0  	0  
23 	38    	0  	0  	0  	0  
24 	42    	0  	0  	0  	0  
25 	41    	0  	0  	0  	0  
26 	39    	0  	0  	0  	0  
27 	41    	0  	0  	0  	0  
28 	38    	0  	0  	0  	0  
29 	42    	0  	0  	0  	0  
30 	44    	0  	0  	0  	0  

最佳參數：
區間個數 (m): 5
持有期間 (h_time): 8 天
目標獲利 (targetProfit): 10.68%
進場門檻 (alpha): 87.84%

交易規則：

規則 1:
價格區間: 322.47 - 488.90
進場條

遺傳算法參數優化：


In [3]:
class GeneticOptimizer:
    def __init__(self):
        # 基本參數設置
        self.POPULATION_SIZE = 100
        self.GENERATIONS = 50
        self.CROSSOVER_PROB = 0.7
        self.MUTATION_PROB = 0.2
        self.TOURNAMENT_SIZE = 3

        # 自適應參數調整
        self.adaptive_params = True
        self.min_diversity = 0.1

    def calculate_population_diversity(self, population):
        """計算種群多樣性"""
        genes = np.array([ind for ind in population])
        avg_dist = np.mean([
            np.linalg.norm(g1 - g2)
            for i, g1 in enumerate(genes)
            for g2 in genes[i+1:]
        ])
        return avg_dist

    def adapt_genetic_params(self, diversity, generation, max_gen):
        """根據種群多樣性和進化代數調整參數"""
        if diversity < self.min_diversity:
            # 種群多樣性過低，增加突變概率
            self.MUTATION_PROB = min(0.4, self.MUTATION_PROB * 1.5)
            self.CROSSOVER_PROB = max(0.5, self.CROSSOVER_PROB * 0.9)
        else:
            # 恢復默認值
            self.MUTATION_PROB = 0.2
            self.CROSSOVER_PROB = 0.7

        # 根據進化代數調整
        progress = generation / max_gen
        if progress > 0.7:  # 後期階段
            self.TOURNAMENT_SIZE = 4  # 增加選擇壓力

    def optimize(self, eval_func, param_ranges):
        population = self.initialize_population(param_ranges)

        for gen in range(self.GENERATIONS):
            # 計算種群多樣性
            diversity = self.calculate_population_diversity(population)

            # 自適應調整參數
            if self.adaptive_params:
                self.adapt_genetic_params(diversity, gen, self.GENERATIONS)

            # 評估適應度
            fitnesses = [eval_func(ind) for ind in population]

            # 選擇和進化
            new_pop = self.evolve_population(population, fitnesses)
            population = new_pop

        return self.get_best_individual(population, eval_func)

擴展技術指標和交易條件：


In [4]:
class TechnicalIndicators:
    def __init__(self):
        self.indicators = {}

    def add_momentum_indicators(self, data):
        """動量指標"""
        # RSI
        self.indicators['RSI'] = self.calculate_rsi(data['Close'])

        # 隨機指標
        self.indicators['Stochastic_K'], self.indicators['Stochastic_D'] = \
            self.calculate_stochastic(data)

        # MACD
        self.indicators['MACD'], self.indicators['Signal'] = \
            self.calculate_macd(data['Close'])

        # ADX (趨勢強度)
        self.indicators['ADX'] = self.calculate_adx(data)

    def add_volume_indicators(self, data):
        """成交量指標"""
        # OBV (能量潮指標)
        self.indicators['OBV'] = self.calculate_obv(data)

        # 成交量加權移動平均
        self.indicators['VWAP'] = self.calculate_vwap(data)

        # 資金流量指標
        self.indicators['MFI'] = self.calculate_mfi(data)

    def add_volatility_indicators(self, data):
        """波動率指標"""
        # 布林帶
        self.indicators['BB_Upper'], self.indicators['BB_Middle'], \
        self.indicators['BB_Lower'] = self.calculate_bollinger_bands(data['Close'])

        # ATR (真實波幅)
        self.indicators['ATR'] = self.calculate_atr(data)

        # 肯特納通道
        self.indicators['KC_Upper'], self.indicators['KC_Middle'], \
        self.indicators['KC_Lower'] = self.calculate_keltner_channel(data)

class EnhancedTradingConditions:
    def __init__(self):
        self.condition_weights = {
            'trend': 0.3,
            'momentum': 0.2,
            'volume': 0.2,
            'volatility': 0.3
        }

    def check_conditions(self, data, indicators, index):
        """檢查所有交易條件"""
        scores = {
            'trend': self.check_trend_conditions(data, indicators, index),
            'momentum': self.check_momentum_conditions(indicators, index),
            'volume': self.check_volume_conditions(data, indicators, index),
            'volatility': self.check_volatility_conditions(indicators, index)
        }

        # 計算加權得分
        final_score = sum(score * self.condition_weights[cond]
                         for cond, score in scores.items())

        return final_score > 0.7  # 70%的條件滿足才進場

風險管理策略優化：


In [5]:
class RiskManagement:
    def __init__(self):
        # 基本風險參數
        self.MAX_POSITION_SIZE = 0.1
        self.MAX_DRAWDOWN = 0.1
        self.STOP_LOSS = 0.05
        self.TRAILING_STOP = 0.03

        # 波動率調整參數
        self.VOLATILITY_MULTIPLIER = 2
        self.MAX_VOLATILITY = 0.03

        # 資金管理參數
        self.KELLY_FRACTION = 0.5
        self.RISK_PER_TRADE = 0.01

    def calculate_position_size(self, capital, price, volatility, win_rate, avg_win, avg_loss):
        """計算最佳部位大小"""
        # Kelly準則
        kelly_size = self.calculate_kelly_size(win_rate, avg_win, avg_loss)

        # 波動率調整
        vol_adj_size = self.MAX_POSITION_SIZE * (self.MAX_VOLATILITY / volatility)

        # 風險價值(VaR)調整
        var_adj_size = self.calculate_var_adjusted_size(price, volatility)

        # 取最小值作為最終部位大小
        position_size = min(kelly_size, vol_adj_size, var_adj_size)
        position_size *= self.KELLY_FRACTION  # 保守調整

        return position_size * capital

    def calculate_kelly_size(self, win_rate, avg_win, avg_loss):
        """計算Kelly準則建議倉位"""
        if avg_loss == 0:
            return 0
        ratio = avg_win / abs(avg_loss)
        kelly = win_rate - (1 - win_rate) / ratio
        return max(0, kelly)

    def manage_existing_positions(self, positions, current_prices, market_conditions):
        """管理現有倉位"""
        for pos in positions:
            # 檢查止損條件
            if self.check_stop_loss(pos, current_prices):
                return "close"

            # 檢查移動止損
            if self.check_trailing_stop(pos, current_prices):
                return "close"

            # 檢查利潤目標
            if self.check_profit_target(pos, current_prices):
                return "reduce"

            # 根據市場條件調整倉位
            if market_conditions['trend_changing']:
                return "hedge"

        return "hold"

策略穩定性評估：


In [6]:
class StrategyEvaluator:
    def __init__(self):
        self.evaluation_periods = ['1Y', '2Y', '5Y']
        self.monte_carlo_sims = 1000

    def evaluate_strategy_stability(self, strategy, data):
        """評估策略穩定性"""
        results = {
            'robustness': self.test_robustness(strategy, data),
            'consistency': self.test_consistency(strategy, data),
            'reliability': self.test_reliability(strategy, data),
            'adaptability': self.test_adaptability(strategy, data)
        }
        return results

    def test_robustness(self, strategy, data):
        """測試策略在不同市場條件下的表現"""
        market_conditions = self.classify_market_conditions(data)

        performance = {}
        for condition in market_conditions:
            subset = data[market_conditions[condition]]
            performance[condition] = self.backtest_strategy(strategy, subset)

        return self.calculate_robustness_score(performance)

    def test_consistency(self, strategy, data):
        """測試策略在不同時期的一致性"""
        period_performance = {}
        for period in self.evaluation_periods:
            period_data = self.get_period_data(data, period)
            period_performance[period] = self.backtest_strategy(strategy, period_data)

        return self.calculate_consistency_score(period_performance)

    def test_reliability(self, strategy, data):
        """使用蒙特卡洛模擬測試策略可靠性"""
        simulation_results = []
        for _ in range(self.monte_carlo_sims):
            # 生成模擬數據
            simulated_data = self.generate_monte_carlo_data(data)
            result = self.backtest_strategy(strategy, simulated_data)
            simulation_results.append(result)

        return self.calculate_reliability_score(simulation_results)

    def test_adaptability(self, strategy, data):
        """測試策略對市場變化的適應性"""
        # 分析策略在市場轉折點的表現
        transition_periods = self.identify_market_transitions(data)
        adaptation_scores = []

        for period in transition_periods:
            before = self.backtest_strategy(strategy, data[:period])
            after = self.backtest_strategy(strategy, data[period:])
            adaptation_scores.append(self.calculate_adaptation_score(before, after))

        return np.mean(adaptation_scores)

優化遺傳算法的收斂速度和解的質量：



In [7]:
class EnhancedGeneticOptimizer:
    def __init__(self):
        # 基本參數
        self.POPULATION_SIZE = 100
        self.GENERATIONS = 50
        self.ELITISM_RATIO = 0.1  # 保留最優個體的比例

        # 自適應參數
        self.adaptive_rates = {
            'crossover': {'min': 0.6, 'max': 0.9, 'current': 0.7},
            'mutation': {'min': 0.1, 'max': 0.4, 'current': 0.2}
        }

    def adapt_rates(self, diversity, progress):
        """自適應調整交叉和突變率"""
        # 根據種群多樣性調整
        if diversity < 0.1:  # 多樣性太低
            self.adaptive_rates['mutation']['current'] *= 1.2  # 增加突變率
            self.adaptive_rates['crossover']['current'] *= 0.9  # 降低交叉率
        elif diversity > 0.5:  # 多樣性太高
            self.adaptive_rates['mutation']['current'] *= 0.9  # 降低突變率
            self.adaptive_rates['crossover']['current'] *= 1.1  # 增加交叉率

        # 根據進化進度調整
        if progress > 0.8:  # 後期階段
            self.adaptive_rates['mutation']['current'] *= 1.5  # 大幅增加突變率

        # 確保在有效範圍內
        for param in self.adaptive_rates.values():
            param['current'] = np.clip(param['current'], param['min'], param['max'])

    def convergence_check(self, population_fitness, tolerance=1e-6, window=5):
        """檢查是否已經收斂"""
        if len(population_fitness) < window:
            return False

        recent_best = population_fitness[-window:]
        improvement = max(recent_best) - min(recent_best)
        return improvement < tolerance

提供更全面的交易信號：



In [8]:
class CompositeSignalGenerator:
    def __init__(self):
        self.signal_weights = {
            'trend': 0.3,
            'momentum': 0.2,
            'volume': 0.2,
            'volatility': 0.15,
            'fundamental': 0.15
        }

    def generate_signals(self, data, current_idx):
        signals = {}

        # 趨勢信號
        signals['trend'] = self.analyze_trend(data, current_idx)

        # 動量信號
        signals['momentum'] = self.analyze_momentum(data, current_idx)

        # 成交量信號
        signals['volume'] = self.analyze_volume(data, current_idx)

        # 波動率信號
        signals['volatility'] = self.analyze_volatility(data, current_idx)

        # 基本面信號
        signals['fundamental'] = self.analyze_fundamental(data, current_idx)

        # 計算綜合信號
        composite_signal = sum(signal * self.signal_weights[key]
                             for key, signal in signals.items())

        return composite_signal, signals

    def analyze_trend(self, data, idx):
        """趨勢分析"""
        sma_short = data['Close'].rolling(20).mean()
        sma_long = data['Close'].rolling(60).mean()
        trend_strength = (sma_short[idx] - sma_long[idx]) / sma_long[idx]
        return trend_strength

    def analyze_momentum(self, data, idx):
        """動量分析"""
        rsi = data['RSI'][idx]
        macd = data['MACD'][idx]
        momentum_signal = (rsi - 50) / 50 + macd
        return momentum_signal

改進風險控制：



In [9]:
class RiskController:
    def __init__(self):
        self.risk_limits = {
            'position_size': 0.1,    # 最大持倉比例
            'drawdown': 0.15,        # 最大回撤限制
            'daily_loss': 0.02,      # 單日最大虧損
            'var_limit': 0.03        # 風險價值限制
        }

    def calculate_position_size(self, strategy_signal, current_risk):
        """動態倉位管理"""
        # 基礎倉位
        base_size = self.risk_limits['position_size']

        # 風險調整
        risk_factor = 1.0

        # 根據當前回撤調整
        if current_risk['drawdown'] > self.risk_limits['drawdown'] * 0.7:
            risk_factor *= 0.5

        # 根據波動率調整
        vol_ratio = current_risk['volatility'] / current_risk['avg_volatility']
        risk_factor *= 1 / vol_ratio

        # 根據信號強度調整
        signal_strength = abs(strategy_signal)
        risk_factor *= signal_strength

        return base_size * risk_factor

    def generate_stop_loss(self, entry_price, current_risk):
        """動態止損設置"""
        # 基礎止損比例
        base_stop = 0.02

        # 根據波動率調整止損
        vol_adjusted_stop = base_stop * (current_risk['volatility'] /
                                       current_risk['avg_volatility'])

        # 計算具體止損價格
        stop_loss_price = entry_price * (1 - vol_adjusted_stop)

        return stop_loss_price

確保策略穩定性：



In [10]:
class StrategyStabilityAnalyzer:
    def __init__(self):
        self.stability_metrics = {
            'sharpe_ratio': [],
            'max_drawdown': [],
            'win_rate': [],
            'profit_factor': []
        }

    def analyze_stability(self, strategy, data):
        """分析策略穩定性"""
        # 時間段測試
        period_results = self.period_analysis(strategy, data)

        # 市場環境測試
        environment_results = self.environment_analysis(strategy, data)

        # 參數敏感性測試
        sensitivity_results = self.parameter_sensitivity(strategy, data)

        # 計算穩定性分數
        stability_score = self.calculate_stability_score(
            period_results,
            environment_results,
            sensitivity_results
        )

        return stability_score

    def period_analysis(self, strategy, data):
        """不同時間段的表現分析"""
        periods = self.split_data_into_periods(data)
        results = []

        for period_data in periods:
            result = self.backtest_strategy(strategy, period_data)
            results.append(result)

        return self.calculate_period_stability(results)

    def environment_analysis(self, strategy, data):
        """不同市場環境下的表現分析"""
        environments = self.classify_market_environments(data)
        results = {}

        for env, env_data in environments.items():
            results[env] = self.backtest_strategy(strategy, env_data)

        return self.calculate_environment_stability(results)

In [11]:
pip install pandas numpy pandas-ta yfinance

Collecting pandas-ta
  Downloading pandas_ta-0.3.14b.tar.gz (115 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m115.1/115.1 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pandas-ta
  Building wheel for pandas-ta (setup.py) ... [?25l[?25hdone
  Created wheel for pandas-ta: filename=pandas_ta-0.3.14b0-py3-none-any.whl size=218910 sha256=22c714bd6cb0921a7806f74486413436162c98ee2b01cde1ecd99a6882c46fe7
  Stored in directory: /root/.cache/pip/wheels/7f/33/8b/50b245c5c65433cd8f5cb24ac15d97e5a3db2d41a8b6ae957d
Successfully built pandas-ta
Installing collected packages: pandas-ta
Successfully installed pandas-ta-0.3.14b0


In [12]:
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
from dataclasses import dataclass
from datetime import datetime
import yfinance as yf

@dataclass
class TradingParameters:
    """交易參數配置"""
    symbol: str
    timeframe: str
    initial_capital: float
    position_size: float
    stop_loss: float
    take_profit: float

class MarketData:
    """市場數據處理類"""
    def __init__(self, symbol: str, timeframe: str):
        self.symbol = symbol
        self.timeframe = timeframe

    def fetch_data(self, start_date: str, end_date: str) -> pd.DataFrame:
        """獲取市場數據"""
        ticker = yf.Ticker(self.symbol)
        data = ticker.history(start=start_date, end=end_date, interval=self.timeframe)
        return self._prepare_data(data)

    def _prepare_data(self, data: pd.DataFrame) -> pd.DataFrame:
        """準備和清洗數據"""
        # 添加技術指標 (使用pandas內置函數代替pandas_ta)
        data['SMA20'] = data['Close'].rolling(window=20).mean()
        data['SMA50'] = data['Close'].rolling(window=50).mean()

        # 實現RSI指標
        delta = data['Close'].diff()
        gain = delta.where(delta > 0, 0)
        loss = -delta.where(delta < 0, 0)
        avg_gain = gain.rolling(window=14).mean()
        avg_loss = loss.rolling(window=14).mean()
        rs = avg_gain / avg_loss
        data['RSI'] = 100 - (100 / (1 + rs))

        # 實現ATR指標
        high_low = data['High'] - data['Low']
        high_close = (data['High'] - data['Close'].shift()).abs()
        low_close = (data['Low'] - data['Close'].shift()).abs()
        ranges = pd.concat([high_low, high_close, low_close], axis=1)
        true_range = ranges.max(axis=1)
        data['ATR'] = true_range.rolling(window=14).mean()

        # 移除NaN值
        return data.dropna()

class TradingStrategy:
    """交易策略基類"""
    def __init__(self, params: TradingParameters):
        self.params = params
        self.position = 0
        self.entry_price = 0

    def generate_signals(self, data: pd.DataFrame) -> pd.Series:
        """生成交易信號"""
        signals = pd.Series(index=data.index, data=0)

        for i in range(1, len(data)):
            # 交叉信號
            if data['SMA20'].iloc[i] > data['SMA50'].iloc[i] and \
               data['SMA20'].iloc[i-1] <= data['SMA50'].iloc[i-1]:
                signals.iloc[i] = 1  # 買入信號
            elif data['SMA20'].iloc[i] < data['SMA50'].iloc[i] and \
                 data['SMA20'].iloc[i-1] >= data['SMA50'].iloc[i-1]:
                signals.iloc[i] = -1  # 賣出信號

        return signals

class PortfolioManager:
    """投資組合管理類"""
    def __init__(self, initial_capital: float):
        self.initial_capital = initial_capital
        self.current_capital = initial_capital
        self.positions = {}
        self.trades = []

    def execute_trade(self, symbol: str, signal: int, price: float, size: float,
                     timestamp: datetime) -> Dict:
        """執行交易"""
        if signal == 0:
            return {}

        cost = price * size
        commission = cost * 0.001  # 0.1% 手續費

        if signal > 0:  # 買入
            if self.current_capital >= (cost + commission):
                self.positions[symbol] = {
                    'size': size,
                    'entry_price': price
                }
                self.current_capital -= (cost + commission)
                trade_type = 'BUY'
            else:
                return {}
        else:  # 賣出
            if symbol in self.positions:
                position = self.positions[symbol]
                revenue = price * position['size']
                profit = revenue - (position['entry_price'] * position['size'])
                self.current_capital += (revenue - commission)
                self.positions.pop(symbol)
                trade_type = 'SELL'
            else:
                return {}

        trade = {
            'timestamp': timestamp,
            'symbol': symbol,
            'type': trade_type,
            'price': price,
            'size': size,
            'commission': commission
        }
        self.trades.append(trade)
        return trade

class RiskManager:
    """風險管理類"""
    def __init__(self, params: TradingParameters):
        self.params = params
        self.positions = {}

    def check_position(self, symbol: str, current_price: float) -> Tuple[bool, str]:
        """檢查持倉風險"""
        if symbol not in self.positions:
            return True, ""

        position = self.positions[symbol]
        entry_price = position['entry_price']

        # 檢查止損
        if current_price <= entry_price * (1 - self.params.stop_loss):
            return False, "Stop Loss Triggered"

        # 檢查止盈
        if current_price >= entry_price * (1 + self.params.take_profit):
            return False, "Take Profit Triggered"

        return True, ""

    def update_position(self, symbol: str, entry_price: float):
        """更新持倉信息"""
        self.positions[symbol] = {
            'entry_price': entry_price
        }

    def close_position(self, symbol: str):
        """關閉持倉"""
        if symbol in self.positions:
            self.positions.pop(symbol)

class TradingSystem:
    """交易系統主類"""
    def __init__(self, params: TradingParameters):
        self.params = params
        self.market_data = MarketData(params.symbol, params.timeframe)
        self.strategy = TradingStrategy(params)
        self.portfolio = PortfolioManager(params.initial_capital)
        self.risk_manager = RiskManager(params)

    def backtest(self, start_date: str, end_date: str) -> Dict:
        """執行回測"""
        # 獲取數據
        data = self.market_data.fetch_data(start_date, end_date)

        # 生成信號
        signals = self.strategy.generate_signals(data)

        # 執行回測
        for i in range(len(data)):
            current_bar = data.iloc[i]
            signal = signals.iloc[i]

            # 風險檢查
            can_trade, reason = self.risk_manager.check_position(
                self.params.symbol, current_bar['Close'])

            # 如果需要平倉
            if not can_trade:
                trade = self.portfolio.execute_trade(
                    self.params.symbol, -1, current_bar['Close'],
                    self.params.position_size, current_bar.name)
                if trade:
                    self.risk_manager.close_position(self.params.symbol)

            # 執行新的交易
            elif signal != 0:
                trade = self.portfolio.execute_trade(
                    self.params.symbol, signal, current_bar['Close'],
                    self.params.position_size, current_bar.name)
                if trade and trade['type'] == 'BUY':
                    self.risk_manager.update_position(
                        self.params.symbol, current_bar['Close'])
                elif trade and trade['type'] == 'SELL':
                    self.risk_manager.close_position(self.params.symbol)

        # 計算回測結果
        return self._calculate_results()

    def _calculate_results(self) -> Dict:
        """計算回測結果"""
        trades = pd.DataFrame(self.portfolio.trades)
        if len(trades) == 0:
            return {
                'total_trades': 0,
                'profit_loss': 0,
                'win_rate': 0,
                'final_capital': self.portfolio.current_capital
            }

        # 計算交易統計
        buy_trades = trades[trades['type'] == 'BUY']
        sell_trades = trades[trades['type'] == 'SELL']

        # 確保買賣交易數量匹配
        min_trades = min(len(buy_trades), len(sell_trades))

        if min_trades > 0:
            buy_prices = buy_trades['price'].iloc[:min_trades].values
            sell_prices = sell_trades['price'].iloc[:min_trades].values
            profits = sell_prices - buy_prices
            win_rate = np.sum(profits > 0) / len(profits)
        else:
            win_rate = 0

        return {
            'total_trades': min_trades,
            'profit_loss': self.portfolio.current_capital - self.portfolio.initial_capital,
            'win_rate': win_rate,
            'final_capital': self.portfolio.current_capital
        }

def main():
    """主函數"""
    # 設置參數
    params = TradingParameters(
        symbol='AAPL',
        timeframe='1d',
        initial_capital=100000,
        position_size=100,
        stop_loss=0.02,
        take_profit=0.05
    )

    # 創建交易系統
    system = TradingSystem(params)

    # 執行回測
    results = system.backtest(
        start_date='2023-01-01',
        end_date='2023-12-31'
    )

    # 打印結果
    print("\n=== Backtest Results ===")
    print(f"Total Trades: {results['total_trades']}")
    print(f"Profit/Loss: ${results['profit_loss']:.2f}")
    print(f"Win Rate: {results['win_rate']:.2%}")
    print(f"Final Capital: ${results['final_capital']:.2f}")

if __name__ == "__main__":
    main()


=== Backtest Results ===
Total Trades: 1
Profit/Loss: $1006.09
Win Rate: 100.00%
Final Capital: $101006.09


In [13]:
import yfinance as yf
import numpy as np
import pandas as pd
from deap import base, creator, tools, algorithms
import random
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
import warnings
warnings.filterwarnings('ignore')

@dataclass
class TradingParameters:
    M_RANGE: Tuple[int, int]
    H_TIME_RANGE: Tuple[int, int]
    TARGET_PROFIT_RANGE: Tuple[float, float]
    ALPHA_RANGE: Tuple[float, float]

class GeneticTradingStrategy:
    def __init__(self, symbol: str, period: str = "5y", population_size: int = 50, generations: int = 30):
        self.symbol = symbol
        self.period = period
        self.population_size = population_size
        self.generations = generations
        self.data = self._load_data()

        # Calculate market characteristics once
        self.market_metrics = self._calculate_market_metrics()
        self.params = self._initialize_parameters()
        self.toolbox = self._setup_toolbox()

        # Pre-calculate commonly used values
        self._cache_technical_indicators()

    def _calculate_market_metrics(self) -> Dict[str, float]:
        """Calculate key market metrics once during initialization"""
        returns = self.data['Close'].pct_change()
        return {
            'volatility': returns.std(),
            'avg_volume': self.data['Volume'].mean(),
            'price_range': self.data['High'].max() - self.data['Low'].min(),
            'min_price': self.data['Low'].min()
        }

    def _initialize_parameters(self) -> TradingParameters:
        """Initialize all trading parameters based on market metrics"""
        volatility = self.market_metrics['volatility']
        avg_volume = self.market_metrics['avg_volume']

        return TradingParameters(
            M_RANGE=(2, 7) if volatility < 0.01 else (3, 10) if volatility < 0.02 else (5, 15),
            H_TIME_RANGE=(10, 40) if volatility < 0.01 else (5, 30) if volatility < 0.02 else (3, 15),
            TARGET_PROFIT_RANGE=(volatility * 2.5, volatility * 10),
            ALPHA_RANGE=(0.6, 0.9) if avg_volume > 1000000 else (0.5, 0.8)
        )

    def _load_data(self) -> pd.DataFrame:
        """Load and prepare stock data with vectorized operations"""
        data = yf.Ticker(self.symbol).history(period=self.period)
        return data

    def _cache_technical_indicators(self) -> None:
        """Pre-calculate and cache all technical indicators using vectorized operations"""
        # SMA calculations
        self.data['SMA20'] = self.data['Close'].rolling(window=20).mean()
        self.data['SMA60'] = self.data['Close'].rolling(window=60).mean()

        # RSI calculation
        delta = self.data['Close'].diff()
        gain = delta.where(delta > 0, 0).rolling(window=14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
        rs = gain / loss
        self.data['RSI'] = 100 - (100 / (1 + rs))

        # MACD calculation
        exp1 = self.data['Close'].ewm(span=12, adjust=False).mean()
        exp2 = self.data['Close'].ewm(span=26, adjust=False).mean()
        self.data['MACD'] = exp1 - exp2
        self.data['Signal'] = self.data['MACD'].ewm(span=9, adjust=False).mean()

        # Pre-calculate volume metrics
        self.data['Volume_MA20'] = self.data['Volume'].rolling(window=20).mean()

        # Pre-calculate price positions
        self.data['High_20'] = self.data['High'].rolling(window=20).max()
        self.data['Low_20'] = self.data['Low'].rolling(window=20).min()
        price_range = self.data['High_20'] - self.data['Low_20']
        self.data['Price_Position'] = (self.data['Close'] - self.data['Low_20']) / price_range

    def _evaluate_strategy(self, individual: List) -> Tuple[float]:
        """Optimized strategy evaluation using vectorized operations"""
        trades = self._simulate_trades_vectorized(individual)

        if not trades or trades.empty:
            return (0,)

        # Calculate metrics using vectorized operations
        total_profit = trades['profit'].sum()
        win_rate = (trades['profit'] > 0).mean()

        # Calculate max drawdown efficiently
        equity_curve = trades['profit'].cumsum()
        rolling_max = np.maximum.accumulate(equity_curve)
        drawdowns = (rolling_max - equity_curve) / rolling_max
        max_drawdown = drawdowns.max()

        # Calculate Sharpe ratio
        sharpe_ratio = ((trades['profit'].mean() - 0.004) / trades['profit'].std()) if len(trades) > 1 else 0

        # Combined fitness score
        fitness = self._calculate_fitness(
            total_profit=total_profit,
            n_trades=len(trades),
            max_drawdown=max_drawdown,
            win_rate=win_rate,
            sharpe_ratio=sharpe_ratio
        )

        return (fitness,)

    def _simulate_trades_vectorized(self, individual: List) -> pd.DataFrame:
        """Vectorized trade simulation for improved performance"""
        m, h_time, target_profit, alpha = individual

        # Calculate price intervals
        interval_length = self.market_metrics['price_range'] / m
        intervals = [(self.market_metrics['min_price'] + i * interval_length,
                     self.market_metrics['min_price'] + (i + 1) * interval_length)
                    for i in range(m)]

        trades_list = []

        # Use boolean indexing for conditions
        for i in range(60, len(self.data) - h_time):
            current_price = self.data['Close'].iloc[i]
            future_price = self.data['Close'].iloc[i + h_time]

            # Find applicable interval
            for lower, upper in intervals:
                if lower <= current_price < upper:
                    potential_profit = (future_price - current_price) / current_price

                    if self._check_trading_conditions(i, current_price, potential_profit, target_profit, alpha):
                        trades_list.append({
                            'entry_date': self.data.index[i],
                            'exit_date': self.data.index[i + h_time],
                            'entry_price': current_price,
                            'exit_price': future_price,
                            'profit': potential_profit
                        })
                    break

        return pd.DataFrame(trades_list)

    def _check_trading_conditions(self, i: int, current_price: float,
                                potential_profit: float, target_profit: float,
                                alpha: float) -> bool:
        """Optimized trading conditions check using cached indicators"""
        # Basic profit check
        if potential_profit <= target_profit or potential_profit <= alpha:
            return False

        # Technical conditions using pre-calculated indicators
        if not (self.data['SMA20'].iloc[i] > self.data['SMA60'].iloc[i] and
                self.data['Close'].iloc[i] > self.data['SMA20'].iloc[i]):
            return False

        if not (30 < self.data['RSI'].iloc[i] < 70):
            return False

        if not (self.data['MACD'].iloc[i] > self.data['Signal'].iloc[i] and
                self.data['MACD'].iloc[i-1] <= self.data['Signal'].iloc[i-1]):
            return False

        # Risk conditions using pre-calculated metrics
        if not (self.data['Volume'].iloc[i] > self.data['Volume_MA20'].iloc[i]):
            return False

        if not (0.3 < self.data['Price_Position'].iloc[i] < 0.7):
            return False

        return True

    def optimize(self) -> Tuple[List, tools.Logbook]:
        """Optimized genetic algorithm execution"""
        pop = self.toolbox.population(n=self.population_size)
        hof = tools.HallOfFame(1)

        stats = tools.Statistics(lambda ind: ind.fitness.values)
        stats.register("avg", np.mean)
        stats.register("max", np.max)
        stats.register("min", np.min)
        stats.register("std", np.std)

        pop, logbook = algorithms.eaSimple(
            pop, self.toolbox,
            cxpb=0.7,
            mutpb=0.2,
            ngen=self.generations,
            stats=stats,
            halloffame=hof,
            verbose=True
        )

        return hof[0], logbook