<a href="https://colab.research.google.com/github/tina287/fianceHomework/blob/main/%E9%87%91%E8%9E%8D%E5%A4%A7%E6%95%B8%E6%93%9A_%E5%9F%BA%E5%9B%A0%E6%BC%94%E7%AE%97%E6%B3%95.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [32]:
import yfinance as yf

def fetch_stock_data(symbols, start_date, end_date):
    # 儲存每隻股票的年化報酬率與年化波動率
    stock_returns = []

    # 迭代所有股票
    for symbol in symbols:
        data = yf.download(symbol + '.TW', start=start_date, end=end_date)['Adj Close']

        # 計算年化報酬率
        annual_return = calculate_annual_return(data)

        # 加入至清單
        stock_returns.append(annual_return)
    stock_returns=pd.DataFrame(np.array(stock_returns))

    # 回傳每支股票的年化報酬清單、年化波動率
    return stock_returns

# 定義計算年化回報率的輔助函數
def calculate_annual_return(data):
    # 計算總期間的年數
    total_days = (data.index[-1] - data.index[0]).days
    years = total_days / 365.0

    # 初始值
    start_value = data.iloc[0]
    # 終值
    end_value = data.iloc[-1]
    return (end_value / start_value) ** (1 / years) - 1

In [33]:
import numpy as np
import pandas as pd

class GeneticPortfolioOptimizer:
    def __init__(self, stock_returns, population_size=100, generations=50, mutation_rate=0.01):
        """
        初始化遺傳演算法投資組合優化器

        參數:
        - stock_returns: 股票歷史回報率的DataFrame
        - population_size: 族群大小
        - generations: 演化代數
        - mutation_rate: 基因突變率
        """
        self.stock_returns = stock_returns
        self.num_stocks = stock_returns.shape[1]
        self.population_size = population_size
        self.generations = generations
        self.mutation_rate = mutation_rate

    def initialize_population(self):
        """
        隨機初始化初始族群，並保證每個染色體至少選擇一部分股票
        """
        population = np.random.randint(2, size=(self.population_size, self.num_stocks))
        # 確保每個染色體至少有 1 個基因為 1
        for chromosome in population:
            if np.sum(chromosome) == 0:  # 如果全為 0
                chromosome[np.random.randint(self.num_stocks)] = 1  # 隨機選擇一支股票
        return population

    def calculate_portfolio_metrics(self, chromosome):
        """
        計算投資組合的回報率和風險

        參數:
        - chromosome: 代表投資組合的二進制陣列

        返回:
        - 投資組合回報率
        - 投資組合風險（標準差）
        """
        # 選擇被選中的股票
        selected_stocks = self.stock_returns.iloc[:, chromosome == 1]
        print(f"Selected Stocks: {selected_stocks.columns.tolist()}")
        # 如果沒有選擇任何股票，返回非常低的適應度
        if selected_stocks.empty:
          return 0, np.inf  # 無選擇時回報率為 0，風險極大

        # 計算等權重投資組合的回報率
        portfolio_returns = selected_stocks.mean()
        avg_return = portfolio_returns.mean()

        # 計算投資組合風險（使用協方差矩陣）
        cov_matrix = selected_stocks.cov()
        portfolio_std = np.sqrt(np.dot(np.dot(portfolio_returns, cov_matrix), portfolio_returns))

        return avg_return, portfolio_std

    def fitness_function(self, chromosome):
        ret, risk = self.calculate_portfolio_metrics(chromosome)
        risk = max(risk, 1e-10)  # 避免除以零

        sharpe_ratio = ret / risk

        # 獎勵多元化的投資組合
        num_selected_stocks = np.sum(chromosome)
        if num_selected_stocks > 0:
            diversity_bonus = np.log(num_selected_stocks + 1)
        else:
            diversity_bonus = 0

        return sharpe_ratio * diversity_bonus

    def selection(self, population, fitness_scores):
        """
        通過輪盤賭選擇方法選擇父代
        """
        fitness_scores = np.nan_to_num(fitness_scores, nan=-np.inf)
        fitness_scores = fitness_scores - np.min(fitness_scores)

        if np.sum(fitness_scores) == 0:  # 防止歸一化出現問題
            probabilities = np.ones(len(fitness_scores)) / len(fitness_scores)
        else:
            probabilities = fitness_scores / np.sum(fitness_scores)

        parents_indices = np.random.choice(
            len(population),
            size=len(population),
            p=probabilities,
            replace=True
        )

        # 檢查是否有足夠的父代
        if len(parents_indices) < 2:
            raise ValueError("父代數量不足以進行交叉操作")

        return population[parents_indices]

    # def crossover(self, parent1, parent2):
    #     """
    #     單點交叉操作
    #     """
    #     if len(parent1) < 2 or len(parent2) < 2:
    #         # 如果染色體長度不足以進行交叉，返回原始父代
    #         return parent1.copy(), parent2.copy()

    #     crossover_point = np.random.randint(1, len(parent1))
    #     child1 = np.concatenate([parent1[:crossover_point], parent2[crossover_point:]])
    #     child2 = np.concatenate([parent2[:crossover_point], parent1[crossover_point:]])
    #     return child1, child2
    def crossover(self, parent1, parent2):
        """
        單點交叉操作，確保子代不為全零
        """
        if len(parent1) < 2 or len(parent2) < 2:
            return parent1.copy(), parent2.copy()

        crossover_point = np.random.randint(1, len(parent1))
        child1 = np.concatenate([parent1[:crossover_point], parent2[crossover_point:]])
        child2 = np.concatenate([parent2[:crossover_point], parent1[crossover_point:]])

        # 確保子代不為全零
        if np.sum(child1) == 0:
            child1[np.random.randint(len(child1))] = 1
        if np.sum(child2) == 0:
            child2[np.random.randint(len(child2))] = 1

        return child1, child2

    # def mutation(self, chromosome):
    #     """
    #     基因突變：隨機翻轉部分基因
    #     """
    #     mutation_mask = np.random.random(len(chromosome)) < self.mutation_rate
    #     chromosome[mutation_mask] = 1 - chromosome[mutation_mask]
    #     return chromosome
    def mutation(self, chromosome):
        """
        基因突變：隨機翻轉部分基因，增加突變的幅度
        """
        mutation_mask = np.random.random(len(chromosome)) < self.mutation_rate
        chromosome[mutation_mask] = 1 - chromosome[mutation_mask]

        # 確保突變後至少有一個基因為 1
        if np.sum(chromosome) == 0:
            chromosome[np.random.randint(len(chromosome))] = 1
        return chromosome

    def optimize(self):
        """
        執行遺傳演算法優化投資組合
        """
        # 初始化族群
        population = self.initialize_population()

        # 追蹤每代最佳解
        best_fitness_history = []
        best_portfolio_history = []

        # 迭代演化
        for generation in range(self.generations):
            # 計算族群適應度
            fitness_scores = np.array([self.fitness_function(chromosome) for chromosome in population])

            # 記錄當前代最佳解
            best_idx = np.argmax(fitness_scores)
            best_fitness_history.append(fitness_scores[best_idx])
            best_portfolio_history.append(population[best_idx])

            # 選擇
            parents = self.selection(population, fitness_scores)

            # 下一代族群
            next_population = []

            # 交配和突變
            for i in range(0, len(parents), 2):
                if i+1 < len(parents):
                    # 交叉
                    child1, child2 = self.crossover(parents[i], parents[i+1])

                    # 突變
                    child1 = self.mutation(child1)
                    child2 = self.mutation(child2)

                    next_population.extend([child1, child2])

            population = np.array(next_population)

        # 找出整個演化過程中最佳的投資組合
        overall_best_idx = np.argmax(best_fitness_history)
        best_portfolio = best_portfolio_history[overall_best_idx]

        # 計算最佳投資組合的詳細指標
        best_return, best_risk = self.calculate_portfolio_metrics(best_portfolio)

        return {
            'selected_stocks': self.stock_returns.columns[best_portfolio == 1].tolist(),
            'return_rate': best_return,
            'risk': best_risk,
            'sharpe_ratio': best_return / best_risk
        }

# 示範使用
def main():
    start_date = '2020-01-01'
    end_date = '2023-01-01'
    stock_symbols = ['1201', '2454', '2357', '4426', '8271','2330']

    # 生成模擬股票回報率
    stock_returns = fetch_stock_data(stock_symbols, start_date, end_date)

    # 初始化並執行優化
    optimizer = GeneticPortfolioOptimizer(
        stock_returns,
        population_size=100,
        generations=50,
        mutation_rate=0.02
    )

    result = optimizer.optimize()

    print("最佳投資組合結果：")
    print(f"選中股票: {result['selected_stocks']}")
    print(f"平均回報率: {result['return_rate']:.4f}")
    print(f"風險（標準差）: {result['risk']:.4f}")
    print(f"夏普比率: {result['sharpe_ratio']:.4f}")
    print(stock_returns.describe())

if __name__ == "__main__":
    main()

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


[1;30;43m串流輸出內容已截斷至最後 5000 行。[0m
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]
Selected Stocks: [0]

In [36]:
# 簡化版基因演算法：最佳化投資組合

# 匯入必要的套件
import yfinance as yf
import numpy as np
import random

# 固定隨機種子
np.random.seed(42)
random.seed(42)

# 年化報酬與風險計算
def calculate_annual_metrics(data):
    years = (data.index[-1] - data.index[0]).days / 365.0
    start, end = data.iloc[0], data.iloc[-1]
    annual_return = (end / start) ** (1 / years) - 1
    daily_returns = data.pct_change().dropna()
    annual_risk = daily_returns.std() * np.sqrt(len(daily_returns))
    return annual_return, annual_risk

# 取得股票資料
def fetch_stock_data(symbols, start_date, end_date):
    returns, risks = [], []
    for symbol in symbols:
        data = yf.download(f"{symbol}.TW", start=start_date, end=end_date)["Adj Close"]
        ret, risk = calculate_annual_metrics(data)
        returns.append(ret)
        risks.append(risk)
    return np.array(returns), np.array(risks)

# 適應函數
def fitness(chromosome, stock_returns, stock_risks):
    portfolio_return = np.sum(chromosome * stock_returns)
    portfolio_risk = np.sqrt(np.sum((chromosome * stock_risks) ** 2))
    sharpe_ratio = portfolio_return / portfolio_risk if portfolio_risk > 0 else 0
    return 0.8 * sharpe_ratio - 0.006 * np.sum(chromosome)

# 初始化、選擇、交配、突變
def initialize_population(size, num_stocks):
    return [np.random.randint(0, 2, num_stocks) for _ in range(size)]

def select(population, fitness_scores):
    probs = (np.array(fitness_scores) - min(fitness_scores) + 1e-6)
    probs /= probs.sum()
    return population[np.random.choice(len(population), p=probs)]

def crossover(parent1, parent2):
    point = np.random.randint(1, len(parent1))
    return np.concatenate((parent1[:point], parent2[point:])), np.concatenate((parent2[:point], parent1[point:]))

def mutate(chromosome, rate):
    for i in range(len(chromosome)):
        if random.random() < rate:
            chromosome[i] = 1 - chromosome[i]
    return chromosome

# GA 主程式
def genetic_algorithm(stock_returns, stock_risks, num_stocks, population_size=20, generations=200, mutation_rate=0.2):
    population = initialize_population(population_size, num_stocks)
    for _ in range(generations):
        fitness_scores = [fitness(chromo, stock_returns, stock_risks) for chromo in population]
        new_population = []
        for _ in range(population_size // 2):
            parent1, parent2 = select(population, fitness_scores), select(population, fitness_scores)
            child1, child2 = crossover(parent1, parent2)
            new_population.extend([mutate(child1, mutation_rate), mutate(child2, mutation_rate)])
        population = new_population
    best_index = np.argmax([fitness(chromo, stock_returns, stock_risks) for chromo in population])
    return population[best_index]

# 設定參數
start_date, end_date = "2020-01-01", "2023-01-01"
stock_symbols = ["2330", "1201", "2454", "2357", "4426", "8271"]
stock_returns, stock_risks = fetch_stock_data(stock_symbols, start_date, end_date)

# 執行演算法
best_portfolio = genetic_algorithm(stock_returns, stock_risks, len(stock_symbols))

# 輸出結果
print("最佳選股組合:", best_portfolio)
print("總報酬:", np.sum(best_portfolio * stock_returns))
print("總風險:", np.sqrt(np.sum((best_portfolio * stock_risks) ** 2)))
for i, symbol in enumerate(stock_symbols):
    print(f"{symbol}: {'選擇' if best_portfolio[i] else '不選擇'}")


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


最佳選股組合: [1 0 1 1 1 1]
總報酬: 1.4712498920044437
總風險: 3.1434678812664174
2330: 選擇
1201: 不選擇
2454: 選擇
2357: 選擇
4426: 選擇
8271: 選擇
