<a href="https://colab.research.google.com/github/nana881023/Financial_Big_Data_Analysis/blob/main/Week13_HW.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 安裝套件

In [6]:
# 安裝需要的套件
!pip install yfinance pandas numpy



In [7]:
# 匯入所需的函式庫
import numpy as np
import pandas as pd
from typing import List, Tuple
from datetime import datetime, timedelta
import yfinance as yf
import warnings
warnings.filterwarnings('ignore')  # 忽略警告訊息

## 定義股票池

In [14]:
class StockUniverse:
  """定義股票池類別，包含台股市值前30大股票"""
  def __init__(self):
    # 定義台股市值前30大（2024年）
    self.stocks = {
      # 半導體產業
      '2330.TW': '台積電',    # 台灣市值第一
      '2454.TW': '聯發科',    # 台灣市值第三
      '2379.TW': '瑞昱',      # 台灣市值第十八
      '2308.TW': '台達電',    # 台灣市值第五

      # 電子零組件
      '2317.TW': '鴻海',      # 台灣市值第二
      '2382.TW': '廣達',      # 台灣市值第十九
      '2327.TW': '國巨',      # 台灣市值第十六
      '3711.TW': '日月光投控', # 台灣市值第八

      # 金融業
      '2881.TW': '富邦金',    # 台灣市值第六
      '2882.TW': '國泰金',    # 台灣市值第七
      '2891.TW': '中信金',    # 台灣市值第九
      '2886.TW': '兆豐金',    # 台灣市值第十
      '2884.TW': '玉山金',    # 台灣市值第十一
      '2885.TW': '元大金',    # 台灣市值第十五
      '2887.TW': '台新金',    # 台灣市值第二十
      '2890.TW': '永豐金',    # 台灣市值第二十三

      # 傳統產業
      '1301.TW': '台塑',      # 台灣市值第十二
      '1303.TW': '南亞',      # 台灣市值第十四
      '1326.TW': '台化',      # 台灣市值第十七
      '2002.TW': '中鋼',      # 台灣市值第二十一
      '1216.TW': '統一',      # 台灣市值第二十二

      # 運輸產業
      '2603.TW': '長榮',      # 台灣市值第二十四
      '2609.TW': '陽明',      # 台灣市值第二十九

      # 其他產業
      '2412.TW': '中華電',    # 台灣市值第四
      '3008.TW': '大立光',    # 台灣市值第十三
      '2912.TW': '統一超',    # 台灣市值第二十五
      '1101.TW': '台泥',      # 台灣市值第二十六
      '2357.TW': '華碩',      # 台灣市值第二十七
      '2301.TW': '光寶科',    # 台灣市值第二十八
      '3045.TW': '台灣大',    # 台灣市值第三十
    }

  def get_stock_data(self, start_date: str, end_date: str) -> pd.DataFrame:
    """
    下載並處理股票歷史數據

    參數:
    start_date (str): 起始日期，格式為 'YYYY-MM-DD'
    end_date (str): 結束日期，格式為 'YYYY-MM-DD'

    返回:
    pd.DataFrame: 包含所有股票日報酬率的DataFrame
    """
    print("開始下載股票數據...")

    # 批量下載所有股票數據
    symbols = list(self.stocks.keys())
    data = yf.download(symbols, start=start_date, end=end_date)['Adj Close']

    # 計算日報酬率
    returns = data.pct_change()

    # 移除缺失值
    returns = returns.dropna()

    print("數據下載完成！")
    return returns

## 最佳化類別

In [18]:
class PortfolioOptimizer:
  """
  使用基因演算法實現投資組合最佳化的類別

  參數:
  - returns (pd.DataFrame): 股票日報酬率資料
  - stock_universe (StockUniverse): 股票池物件
  - population_size (int): 族群大小，預設50
  - generations (int): 迭代代數，預設100
  - mutation_rate (float): 突變機率，預設0.05
  - tournament_size (int): 競賽選擇大小，預設3
  - min_stocks (int): 最少選股數量，預設4
  - max_stocks (int): 最多選股數量，預設8
  """
  def __init__(
    self,
    returns: pd.DataFrame,
    stock_universe: StockUniverse,
    population_size: int = 50,
    generations: int = 100,
    mutation_rate: float = 0.05,  # 提高突變率以增加多樣性
    tournament_size: int = 3,
    min_stocks: int = 4,  # 增加最小持股數以提高分散度
    max_stocks: int = 8   # 增加最大持股數以提高分散度
  ):
    self.returns = returns.values  # 轉換為numpy array以提升計算效率
    self.stock_universe = stock_universe
    self.n_stocks = len(returns.columns)
    self.population_size = population_size
    self.generations = generations
    self.mutation_rate = mutation_rate
    self.tournament_size = tournament_size
    self.min_stocks = min_stocks
    self.max_stocks = max_stocks
    self.stock_names = returns.columns

  def calculate_portfolio_metrics(self, weights: np.ndarray) -> Tuple[float, float]:
    """
    計算投資組合的年化報酬率和風險

    參數:
    weights (np.ndarray): 投資權重數組

    返回:
    Tuple[float, float]: (年化報酬率, 年化風險)
    """
    # 計算投資組合報酬率序列
    portfolio_returns = np.dot(self.returns, weights)

    # 計算年化報酬率 (假設一年252個交易日)
    annual_return = np.mean(portfolio_returns) * 252

    # 計算年化風險
    annual_risk = np.std(portfolio_returns) * np.sqrt(252)

    return annual_return, annual_risk

  def calculate_fitness(self, chromosome: np.ndarray) -> Tuple[float, float]:
    """
    計算染色體的適應度（年化報酬率）和風險

    參數:
    chromosome (np.ndarray): 二進制染色體序列

    返回:
    Tuple[float, float]: (適應度值, 風險值)
    """
    n_selected = np.sum(chromosome)

    # 檢查選股數量是否在允許範圍內
    if not (self.min_stocks <= n_selected <= self.max_stocks):
      return -np.inf, np.inf

    if n_selected == 0:
      return -np.inf, np.inf

    # 計算等權重
    weights = chromosome / n_selected

    # 計算投資組合指標
    return self.calculate_portfolio_metrics(weights)

  def tournament_selection(self, population: np.ndarray, fitness_values: np.ndarray) -> np.ndarray:
    """
    使用競賽選擇法選出優秀個體

    參數:
    population (np.ndarray): 當前族群
    fitness_values (np.ndarray): 適應度值數組

    返回:
    np.ndarray: 選出的個體
    """
    selected = np.zeros_like(population)
    for i in range(len(population)):
      # 隨機選擇tournament_size個個體進行競賽
      tournament_idx = np.random.choice(len(population), self.tournament_size)
      # 選出適應度最高的個體
      winner_idx = tournament_idx[np.argmax(fitness_values[tournament_idx])]
      selected[i] = population[winner_idx]
    return selected

  def crossover(self, parent1: np.ndarray, parent2: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    執行單點交配操作

    參數:
    parent1, parent2 (np.ndarray): 父代染色體

    返回:
    Tuple[np.ndarray, np.ndarray]: 兩個子代染色體
    """
    # 隨機選擇交配點
    point = np.random.randint(1, len(parent1))
    child1 = np.concatenate([parent1[:point], parent2[point:]])
    child2 = np.concatenate([parent2[:point], parent1[point:]])

    # 確保選股數量符合限制
    for child in [child1, child2]:
      while np.sum(child) < self.min_stocks:
        zero_indices = np.where(child == 0)[0]
        if len(zero_indices) > 0:
          idx = np.random.choice(zero_indices)
          child[idx] = 1

      while np.sum(child) > self.max_stocks:
        one_indices = np.where(child == 1)[0]
        if len(one_indices) > 0:
          idx = np.random.choice(one_indices)
          child[idx] = 0

    return child1, child2

  def mutation(self, chromosome: np.ndarray) -> np.ndarray:
    """
    執行突變操作

    參數:
    chromosome (np.ndarray): 待突變的染色體

    返回:
    np.ndarray: 突變後的染色體
    """
    mutated = chromosome.copy()
    for i in range(len(mutated)):
      if np.random.random() < self.mutation_rate:
        mutated[i] = 1 - mutated[i]  # 反轉基因

    # 確保選股數量符合限制
    while np.sum(mutated) < self.min_stocks:
      zero_indices = np.where(mutated == 0)[0]
      if len(zero_indices) > 0:
        idx = np.random.choice(zero_indices)
        mutated[idx] = 1

    while np.sum(mutated) > self.max_stocks:
      one_indices = np.where(mutated == 1)[0]
      if len(one_indices) > 0:
        idx = np.random.choice(one_indices)
        mutated[idx] = 0

    return mutated

  def optimize(self) -> Tuple[np.ndarray, float, float]:
    """
    執行基因演算法最佳化

    返回:
    Tuple[np.ndarray, float, float]: (最佳染色體, 最佳報酬率, 對應風險)
    """
    # 初始化族群，確保每個染色體都符合選股數量限制
    population = []
    while len(population) < self.population_size:
      chromosome = np.random.randint(2, size=self.n_stocks)
      if self.min_stocks <= np.sum(chromosome) <= self.max_stocks:
        population.append(chromosome)
    population = np.array(population)

    best_chromosome = None
    best_return = -np.inf
    best_risk = np.inf
    best_generation = 0  # 記錄找到最佳解的代數

    print("\n開始最佳化過程...")
    for generation in range(self.generations):
      # 計算這一代中每個染色體的適應度
      fitness_results = [self.calculate_fitness(chrom) for chrom in population]
      returns = np.array([r for r, _ in fitness_results])

      # 找出這一代中表現最好的染色體
      current_max_idx = np.argmax(returns)
      current_return = returns[current_max_idx]

      # 如果找到更好的解，更新最佳解並顯示詳細資訊
      if current_return > best_return and not np.isinf(current_return):
        best_return = current_return
        best_risk = fitness_results[current_max_idx][1]
        best_chromosome = population[current_max_idx].copy()
        best_generation = generation + 1

        print(f"\n第 {generation + 1} 代找到更好的解：")
        print(f"報酬率: {best_return:.2%}")
        print("選股組合:")
        selected_stocks = []
        for i, selected in enumerate(best_chromosome):
          if selected:
            symbol = self.stock_names[i]
            selected_stocks.append(f"{symbol} ({self.stock_universe.stocks[symbol]})")
        print(", ".join(selected_stocks))

      # 每10代顯示進度
      if (generation + 1) % 10 == 0:
        print(f"\n第 {generation + 1} 代完成")
        print(f"當前最佳報酬率: {best_return:.2%} (來自第 {best_generation} 代)")
        print("當前最佳選股組合:")
        selected_stocks = []
        for i, selected in enumerate(best_chromosome):
          if selected:
            symbol = self.stock_names[i]
            selected_stocks.append(f"{symbol} ({self.stock_universe.stocks[symbol]})")
        print(", ".join(selected_stocks))

      # 選擇、交配和突變
      population = self.tournament_selection(population, returns)
      new_population = []
      for i in range(0, self.population_size, 2):
        p1, p2 = population[i], population[min(i+1, len(population)-1)]
        c1, c2 = self.crossover(p1, p2)
        new_population.extend([self.mutation(c1), self.mutation(c2)])
      population = np.array(new_population)

    return best_chromosome, best_return, best_risk

### 主程式

In [16]:
def run_portfolio_optimization():
  """執行投資組合最佳化的主函數"""

  # 建立股票池
  stock_universe = StockUniverse()

  # 設定回測期間（預設使用過去一年的資料）
  end_date = datetime.now()
  start_date = end_date - timedelta(days=365)

  # 下載股票資料
  returns_data = stock_universe.get_stock_data(
    start_date.strftime('%Y-%m-%d'),
    end_date.strftime('%Y-%m-%d')
  )

  # 建立最佳化器
  optimizer = PortfolioOptimizer(
    returns=returns_data,
    stock_universe=stock_universe,
    population_size=100,    # 族群大小
    generations=100,        # 迭代代數
    mutation_rate=0.05,     # 突變率
    tournament_size=3,      # 競賽選擇大小
    min_stocks=4,          # 最少選股數量
    max_stocks=8           # 最多選股數量
  )

  # 執行最佳化
  best_portfolio, best_return, best_risk = optimizer.optimize()

  # 輸出結果
  print("\n最佳投資組合:")
  print("\n選擇的股票:")
  selected_stocks = []
  for i, selected in enumerate(best_portfolio):
    if selected:
      symbol = returns_data.columns[i]
      print(f"- {symbol} ({stock_universe.stocks[symbol]})")
      selected_stocks.append(symbol)

  print(f"\n投資組合績效:")
  print(f"年化報酬率: {best_return:.2%}")
  print(f"年化風險: {best_risk:.2%}")
  print(f"夏普比率: {best_return/best_risk:.2f}")

  # 繪製所選股票的走勢圖
  print("\n選股結果分析:")
  n_selected = sum(best_portfolio)
  print(f"選出 {n_selected} 支股票")
  print(f"分散度: {n_selected/len(best_portfolio):.2%}")



## 執行

In [19]:
if __name__ == "__main__":
  run_portfolio_optimization()

[**********            20%                       ]  6 of 30 completed

開始下載股票數據...


[*********************100%***********************]  30 of 30 completed


數據下載完成！

開始最佳化過程...

第 1 代找到更好的解：
報酬率: 47.42%
選股組合:
2379.TW (瑞昱), 2382.TW (廣達), 2412.TW (中華電), 2454.TW (聯發科), 2603.TW (長榮), 2609.TW (陽明), 2882.TW (國泰金), 2885.TW (元大金)

第 2 代找到更好的解：
報酬率: 50.63%
選股組合:
2330.TW (台積電), 2357.TW (華碩), 2412.TW (中華電), 2454.TW (聯發科), 2603.TW (長榮), 2882.TW (國泰金), 2885.TW (元大金)

第 3 代找到更好的解：
報酬率: 57.44%
選股組合:
2308.TW (台達電), 2454.TW (聯發科), 2603.TW (長榮), 2609.TW (陽明), 2881.TW (富邦金)

第 5 代找到更好的解：
報酬率: 71.43%
選股組合:
2317.TW (鴻海), 2603.TW (長榮), 2609.TW (陽明), 2881.TW (富邦金)

第 10 代完成
當前最佳報酬率: 71.43% (來自第 5 代)
當前最佳選股組合:
2317.TW (鴻海), 2603.TW (長榮), 2609.TW (陽明), 2881.TW (富邦金)

第 14 代找到更好的解：
報酬率: 72.06%
選股組合:
2317.TW (鴻海), 2357.TW (華碩), 2603.TW (長榮), 2609.TW (陽明)

第 15 代找到更好的解：
報酬率: 72.11%
選股組合:
2317.TW (鴻海), 2330.TW (台積電), 2603.TW (長榮), 2609.TW (陽明), 2882.TW (國泰金)

第 18 代找到更好的解：
報酬率: 77.25%
選股組合:
2317.TW (鴻海), 2330.TW (台積電), 2603.TW (長榮), 2609.TW (陽明)

第 20 代完成
當前最佳報酬率: 77.25% (來自第 18 代)
當前最佳選股組合:
2317.TW (鴻海), 2330.TW (台積電), 2603.TW (長榮), 2609.TW (陽明)

第 30 代完成
當前最佳報酬率: 77