In [2]:
import os
import pandas as pd
import numpy as np
import logging
from pathlib import Path
from typing import Dict, List, Tuple
from datetime import datetime

In [4]:
class FileValidator:
    """檔案驗證器"""
    
    def __init__(self, base_path: str = "D:/Min/Python/Project/FA_Data"):
        self.base_path = Path(base_path)
        self.setup_logger()
        
        # 定義必要檔案和欄位
        self.required_files = {
            "market_index.csv": {
                "path": "meta_data/market_index.csv",
                "fields": ["Date", "Open", "High", "Low", "Close", "Adj Close", "Volume"]
            },
            "industry_index.csv": {
                "path": "meta_data/industry_index.csv",
                "fields": ["日期", "指數名稱", "收盤指數", "漲跌", "漲跌點數", "漲跌百分比"]
            },
            "stock_data_whole.csv": {
                "path": "meta_data/stock_data_whole.csv",
                "fields": [
                    "證券代號", "證券名稱", "日期", "開盤價", "最高價", "最低價", "收盤價",
                    "成交股數", "成交筆數", "成交金額", "漲跌(+/-)", "漲跌價差",
                    "最後揭示買價", "最後揭示買量", "最後揭示賣價", "最後揭示賣量", "本益比"
                ]
            },
            "companies_final.csv": {
                "path": "meta_data/companies_final.csv",
                "fields": ["stock_id", "industry_category"]
            },
            "all_stocks_data.csv": {
                "path": "meta_data/all_stocks_data.csv",
                "fields": [
                    # 基本資料
                    "證券代號", "證券名稱", "日期", "開盤價", "最高價", "最低價", "收盤價",
                    "成交股數", "成交筆數", "成交金額", "本益比",
                    # 技術指標
                    "SMA30", "DEMA30", "EMA30", "RSI",
                    "MACD", "MACD_signal", "MACD_hist",
                    "slowk", "slowd", "TSF", "middleband", "SAR",                    
                ]
            }
        }
    
    def setup_logger(self):
        """設置日誌"""
        self.logger = logging.getLogger('FileValidator')
        self.logger.setLevel(logging.INFO)
        
        # 檔案處理器
        fh = logging.FileHandler(
            self.base_path / 'logs' / f'file_validation_{datetime.now():%Y%m%d}.log',
            encoding='utf-8'
        )
        fh.setLevel(logging.INFO)
        
        # 控制台處理器
        ch = logging.StreamHandler()
        ch.setLevel(logging.INFO)
        
        # 設置格式
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        fh.setFormatter(formatter)
        ch.setFormatter(formatter)
        
        self.logger.addHandler(fh)
        self.logger.addHandler(ch)
    
    def check_file_exists(self, file_info: Dict) -> bool:
        """檢查檔案是否存在"""
        file_path = self.base_path / file_info["path"]
        exists = file_path.exists()
        if not exists:
            self.logger.error(f"找不到檔案: {file_path}")
        return exists
    
    def validate_file_fields(self, file_name: str, file_info: Dict) -> Tuple[bool, List[str]]:
        """驗證檔案欄位"""
        try:
            file_path = self.base_path / file_info["path"]
            df = pd.read_csv(file_path)
            
            # 檢查必要欄位
            required_fields = file_info["fields"]
            missing_fields = [field for field in required_fields if field not in df.columns]
            
            if missing_fields:
                self.logger.error(f"檔案 {file_name} 缺少必要欄位: {missing_fields}")
                return False, missing_fields
            
            # 檢查欄位數據有效性
            empty_fields = []
            for field in required_fields:
                if df[field].isna().all():
                    empty_fields.append(field)
                    self.logger.warning(f"檔案 {file_name} 的欄位 {field} 全為空值")
            
            return len(empty_fields) == 0, empty_fields
            
        except Exception as e:
            self.logger.error(f"驗證檔案 {file_name} 時發生錯誤: {str(e)}")
            return False, []
    
    def validate_all_files(self) -> Dict:
        """驗證所有必要檔案"""
        validation_results = {}
        
        for file_name, file_info in self.required_files.items():
            self.logger.info(f"開始驗證檔案: {file_name}")
            
            # 檢查檔案存在
            file_exists = self.check_file_exists(file_info)
            if not file_exists:
                validation_results[file_name] = {
                    "exists": False,
                    "fields_valid": False,
                    "missing_fields": [],
                    "status": "檔案不存在"
                }
                continue
            
            # 驗證欄位
            fields_valid, missing_fields = self.validate_file_fields(file_name, file_info)
            
            validation_results[file_name] = {
                "exists": True,
                "fields_valid": fields_valid,
                "missing_fields": missing_fields,
                "status": "正常" if fields_valid else "缺少必要欄位"
            }
            
            # 記錄驗證結果
            if fields_valid:
                self.logger.info(f"檔案 {file_name} 驗證通過")
            else:
                self.logger.warning(f"檔案 {file_name} 驗證失敗")
        
        return validation_results

    def generate_validation_report(self, results: Dict):
        """生成驗證報告"""
        report_path = self.base_path / 'logs' / f'validation_report_{datetime.now():%Y%m%d}.txt'
        
        with open(report_path, 'w', encoding='utf-8') as f:
            f.write("檔案驗證報告\n")
            f.write(f"生成時間: {datetime.now():%Y-%m-%d %H:%M:%S}\n\n")
            
            for file_name, result in results.items():
                f.write(f"\n檔案: {file_name}\n")
                f.write(f"存在性: {'是' if result['exists'] else '否'}\n")
                if result['exists']:
                    f.write(f"欄位驗證: {'通過' if result['fields_valid'] else '失敗'}\n")
                    if not result['fields_valid']:
                        f.write(f"缺少欄位: {', '.join(result['missing_fields'])}\n")
                f.write(f"狀態: {result['status']}\n")
                f.write("-" * 50 + "\n")

def main():
    """主程序"""
    validator = FileValidator()
    validator.logger.info("開始檔案驗證程序")
    
    try:
        # 執行驗證
        results = validator.validate_all_files()
        
        # 生成報告
        validator.generate_validation_report(results)
        
        # 檢查是否全部通過
        all_passed = all(
            r["exists"] and r["fields_valid"] 
            for r in results.values()
        )
        
        if all_passed:
            validator.logger.info("所有檔案驗證通過")
        else:
            validator.logger.warning("部分檔案驗證失敗，請查看詳細報告")
        
        return all_passed
        
    except Exception as e:
        validator.logger.error(f"驗證過程發生錯誤: {str(e)}")
        return False

if __name__ == "__main__":
    success = main()
    print(f"驗證結果: {'成功' if success else '失敗'}")

2024-11-21 20:37:17,698 - FileValidator - INFO - 開始檔案驗證程序
2024-11-21 20:37:17,698 - FileValidator - INFO - 開始檔案驗證程序
2024-11-21 20:37:17,703 - FileValidator - INFO - 開始驗證檔案: market_index.csv
2024-11-21 20:37:17,703 - FileValidator - INFO - 開始驗證檔案: market_index.csv
2024-11-21 20:37:17,776 - FileValidator - INFO - 檔案 market_index.csv 驗證通過
2024-11-21 20:37:17,776 - FileValidator - INFO - 檔案 market_index.csv 驗證通過
2024-11-21 20:37:17,779 - FileValidator - INFO - 開始驗證檔案: industry_index.csv
2024-11-21 20:37:17,779 - FileValidator - INFO - 開始驗證檔案: industry_index.csv
2024-11-21 20:37:18,067 - FileValidator - INFO - 檔案 industry_index.csv 驗證通過
2024-11-21 20:37:18,067 - FileValidator - INFO - 檔案 industry_index.csv 驗證通過
2024-11-21 20:37:18,071 - FileValidator - INFO - 開始驗證檔案: stock_data_whole.csv
2024-11-21 20:37:18,071 - FileValidator - INFO - 開始驗證檔案: stock_data_whole.csv
  df = pd.read_csv(file_path)
2024-11-21 20:37:27,996 - FileValidator - INFO - 檔案 stock_data_whole.csv 驗證通過
2024-11-21 20:37:27,

驗證結果: 成功


In [3]:
class FeatureGeneratorPreCheck:
    """特徵生成器的前置檢查"""
    
    def __init__(self, base_path: str = "D:/Min/Python/Project/FA_Data"):
        self.base_path = Path(base_path)
        self.logger = logging.getLogger('FeatureGeneratorPreCheck')

    def analyze_industry_mappings(self):
        """分析產業分類和產業指數的對應關係"""
        try:
            # 1. 讀取產業指數數據
            print("\n讀取產業指數數據...")
            industry_index_df = pd.read_csv(
                self.base_path / 'meta_data' / 'industry_index.csv'
            )
            
            # 2. 讀取公司產業分類
            print("讀取公司產業分類...")
            company_df = pd.read_csv(
                self.base_path / 'meta_data' / 'companies_final.csv',
                dtype={'stock_id': str}
            )
            
            # 3. 獲取所有指數名稱並標準化
            print("\n分析產業指數名稱...")
            index_names = industry_index_df['指數名稱'].unique()
            print(f"找到 {len(index_names)} 個產業指數")
            
            # 將指數名稱標準化並分類
            standard_indices = {}
            for name in index_names:
                base_name = name.replace('報酬指數', '').replace('指數', '').strip()
                if base_name not in standard_indices:
                    standard_indices[base_name] = []
                standard_indices[base_name].append(name)
            
            print(f"\n標準化後有 {len(standard_indices)} 個不同產業")
            
            # 4. 獲取所有公司產業分類
            company_industries = company_df['industry_category'].unique()
            print(f"公司分類中有 {len(company_industries)} 個產業")
            
            # 5. 分析對應關係
            print("\n=== 產業對應分析 ===")
            mappings = {}
            unmapped = []
            
            for industry in company_industries:
                base_industry = industry.replace('類', '').strip()
                
                # 直接對應
                if base_industry in standard_indices:
                    mappings[industry] = [base_industry]
                    print(f"直接對應: {industry} -> {base_industry}")
                    continue
                    
                # 模糊匹配
                matches = []
                for std_name in standard_indices.keys():
                    if base_industry in std_name or std_name in base_industry:
                        matches.append(std_name)
                
                if matches:
                    mappings[industry] = matches
                    print(f"可能對應: {industry} -> {matches}")
                else:
                    unmapped.append(industry)
                    print(f"未找到對應: {industry}")
            
            # 6. 輸出分析摘要
            print("\n=== 分析摘要 ===")
            print(f"總共分析了 {len(company_industries)} 個公司產業分類")
            print(f"找到明確對應: {len([k for k,v in mappings.items() if len(v)==1])} 個")
            print(f"需要確認對應: {len([k for k,v in mappings.items() if len(v)>1])} 個")
            print(f"未找到對應: {len(unmapped)} 個")
            
            # 7. 輸出建議對應表
            print("\n=== 建議的產業對應表 ===")
            print("industry_mapping = {")
            for industry, matches in mappings.items():
                if len(matches) == 1:
                    print(f"    '{industry}': '{matches[0]}',")
                else:
                    print(f"    # '{industry}': '需要確認: {matches}',")
            for industry in unmapped:
                print(f"    # '{industry}': '需要手動確認',")
            print("}")
            
            return mappings, unmapped
            
        except Exception as e:
            print(f"產業對應分析失敗: {str(e)}")
            return None, None
        
    def check_industry_mapping(self) -> bool:
        """檢查產業分類映射"""
        try:
            # 1. 檢查產業分類檔案
            mapping_file = self.base_path / 'meta_data' / 'companies_final.csv'
            if not mapping_file.exists():
                self.logger.error("找不到產業分類檔案: companies_final.csv")
                return False
                
            # 2. 檢查產業分類內容
            df = pd.read_csv(mapping_file, dtype={'stock_id': str})
            
            # 檢查必要欄位
            if not all(col in df.columns for col in ['stock_id', 'industry_category']):
                self.logger.error("產業分類檔案缺少必要欄位")
                return False
                
            # 檢查是否有空值
            null_stocks = df[df['industry_category'].isnull()]
            if not null_stocks.empty:
                self.logger.warning(f"有 {len(null_stocks)} 支股票缺少產業分類")
                self.logger.debug(f"缺少產業分類的股票: {null_stocks['stock_id'].tolist()}")
            
            return True
            
        except Exception as e:
            self.logger.error(f"產業分類檢查失敗: {str(e)}")
            return False
    
    def check_industry_analysis_files(self) -> bool:
        """檢查產業分析檔案"""
        try:
            # 1. 建立產業名稱對應表
            industry_mapping = {
                '金融保險類': '金融保險',  # 修正產業名稱對應
                '電子工業類': '電子工',    # 修正檔案名稱對應
                # 可以加入其他的對應關係
            }
            
            # 2. 讀取產業指數資料
            industry_index_path = self.base_path / 'meta_data' / 'industry_index.csv'
            industry_index_df = pd.read_csv(industry_index_path)
            
            # 提取標準產業名稱
            standard_industries = industry_index_df[
                industry_index_df['指數名稱'].str.contains('報酬指數')
            ]['指數名稱'].apply(lambda x: x.replace('報酬指數', '').replace('類', '').strip()).unique()
            
            # 3. 讀取公司產業分類
            company_industry_path = self.base_path / 'meta_data' / 'companies_final.csv'
            company_df = pd.read_csv(company_industry_path, dtype={'stock_id': str})
            company_industries = company_df['industry_category'].unique()
            
            # 4. 分類並檢查產業
            missing_files = []
            nonstandard_industries = []
            
            for industry in company_industries:
                # 移除"類"字並標準化名稱
                industry_name = industry.replace('類', '').strip()
                
                # 使用對應表進行名稱轉換
                mapped_name = industry_mapping.get(industry, industry_name)
                
                # 檢查是否為標準產業
                if mapped_name not in standard_industries:
                    if industry not in ['農業科技', '電子商務類']:  # 已知的新產業不顯示警告
                        nonstandard_industries.append(industry)
                    continue
                
                # 檢查分析檔案
                file_found = False
                for dir_name in ['return_index', 'price_index']:
                    dir_path = self.base_path / 'industry_analysis' / dir_name
                    # 嘗試多種可能的檔案名模式
                    patterns = [
                        f"{mapped_name}_*.json",
                        f"{industry_name}_*.json",
                        f"{mapped_name.replace('類', '')}_*.json"
                    ]
                    
                    for pattern in patterns:
                        if list(dir_path.glob(pattern)):
                            file_found = True
                            break
                            
                    if file_found:
                        break
                
                if not file_found:
                    missing_files.append(industry)
            
            # 5. 輸出結果
            if nonstandard_industries:
                self.logger.warning(
                    "需要確認產業對應關係的項目:\n"
                    f"{nonstandard_industries}\n"
                    "請檢查這些產業是否應該對應到現有的產業指數"
                )
            
            if missing_files:
                self.logger.error(
                    "缺少以下產業的分析檔案:\n"
                    f"{missing_files}\n"
                    "請確認檔案是否存在或名稱是否正確"
                )
            
            # 6. 生成建議動作
            if nonstandard_industries or missing_files:
                self.logger.info("\n建議動作:")
                if nonstandard_industries:
                    self.logger.info(
                        "1. 請在 companies_final.csv 中檢查以下產業的分類:\n"
                        f"   {nonstandard_industries}\n"
                        "   - 確認是否應該對應到現有產業指數\n"
                        "   - 如果是新產業，請確認是否需要建立新的產業指數"
                    )
                if missing_files:
                    self.logger.info(
                        "2. 請檢查以下產業的分析檔案:\n"
                        f"   {missing_files}\n"
                        "   - 確認檔案是否已生成\n"
                        "   - 檢查檔案命名是否正確"
                    )
            
            # 只有標準產業缺少分析檔案才算失敗
            return len(missing_files) == 0
            
        except Exception as e:
            self.logger.error(f"產業分析檔案檢查失敗: {str(e)}")
            return False
    
    def check_technical_indicators(self) -> bool:
        """檢查技術指標數據"""
        try:
            # 1. 檢查全部技術指標檔案
            tech_file = self.base_path / 'meta_data' / 'all_stocks_data.csv'
            if not tech_file.exists():
                self.logger.error("找不到技術指標檔案")
                return False
            
            # 2. 檢查技術指標內容
            df = pd.read_csv(tech_file, dtype={'證券代號': str})
            
            # 必要的技術指標
            required_indicators = [
                'SMA30', 'DEMA30', 'EMA30', 'RSI',
                'MACD', 'MACD_signal', 'MACD_hist',
                'slowk', 'slowd', 'TSF', 'middleband', 'SAR'
            ]
            
            missing_indicators = [ind for ind in required_indicators if ind not in df.columns]
            if missing_indicators:
                self.logger.error(f"缺少必要的技術指標: {missing_indicators}")
                return False
            
            return True
            
        except Exception as e:
            self.logger.error(f"技術指標檢查失敗: {str(e)}")
            return False
    
    def check_and_create_directory_structure(self) -> bool:
        """檢查目錄結構，如果不存在就建立"""
        try:
            required_dirs = {
                "meta_data": ["backup"],  # logs可以動態建立，先從必要目錄清單移除
                "industry_analysis": ["return_index", "price_index"],
                "technical_analysis": None,
                "features": None
            }
            
            # 先建立主目錄
            for main_dir, sub_dirs in required_dirs.items():
                main_path = self.base_path / main_dir
                if not main_path.exists():
                    self.logger.warning(f"建立主目錄: {main_dir}")
                    main_path.mkdir(parents=True, exist_ok=True)
                    
                if sub_dirs:
                    for sub_dir in sub_dirs:
                        sub_path = main_path / sub_dir
                        if not sub_path.exists():
                            self.logger.warning(f"建立子目錄: {main_dir}/{sub_dir}")
                            sub_path.mkdir(parents=True, exist_ok=True)
            
            # 特別處理 logs 目錄
            logs_dir = self.base_path / "meta_data" / "logs"
            if not logs_dir.exists():
                self.logger.info(f"建立日誌目錄: {logs_dir}")
                logs_dir.mkdir(parents=True, exist_ok=True)
            
            return True
                
        except Exception as e:
            self.logger.error(f"目錄結構檢查/建立失敗: {str(e)}")
            return False

def main():
    """主程序"""
    checker = FeatureGeneratorPreCheck()
    
    # 1. 檢查目錄結構
    if not checker.check_and_create_directory_structure():
        print("目錄結構檢查失敗，請確認必要目錄都存在")
        return False
    
    # 2. 先執行產業對應分析
    print("\n=== 執行產業對應分析 ===")
    mappings, unmapped = checker.analyze_industry_mappings()
    if mappings is None:
        print("產業對應分析失敗")
        return False
        
    # 等待使用者確認
    input("\n請查看產業對應分析結果，按 Enter 繼續...")
    
    # 3. 檢查產業分類映射
    if not checker.check_industry_mapping():
        print("產業分類檢查失敗，請確認產業分類資料正確")
        return False
    
    # 4. 檢查產業分析檔案
    if not checker.check_industry_analysis_files():
        print("產業分析檔案檢查失敗，請確認產業分析檔案完整")
        return False
    
    # 5. 檢查技術指標
    if not checker.check_technical_indicators():
        print("技術指標檢查失敗，請確認技術指標數據完整")
        return False
    
    print("所有前置檢查通過，可以開始生成特徵")
    return True

if __name__ == "__main__":
    success = main()
    print(f"前置檢查結果: {'成功' if success else '失敗'}")


=== 執行產業對應分析 ===

讀取產業指數數據...
讀取公司產業分類...

分析產業指數名稱...
找到 77 個產業指數

標準化後有 42 個不同產業
公司分類中有 37 個產業

=== 產業對應分析 ===
可能對應: 水泥類 -> ['水泥窯製類', '水泥類']
可能對應: 其他類 -> ['其他電子類', '其他類']
可能對應: 食品類 -> ['食品類']
可能對應: 電器電纜類 -> ['電器電纜類']
可能對應: 觀光類 -> ['觀光類', '觀光餐旅類']
可能對應: 觀光餐旅類 -> ['觀光餐旅類']
可能對應: 生技醫療類 -> ['化學生技醫療類', '生技醫療類']
可能對應: 塑膠類 -> ['塑膠化工類', '塑膠類']
可能對應: 建材營造類 -> ['建材營造類']
可能對應: 汽車類 -> ['汽車類']
可能對應: 電子零組件類 -> ['電子零組件類']
可能對應: 綠能環保類 -> ['綠能環保類']
可能對應: 紡織纖維類 -> ['紡織纖維類']
可能對應: 貿易百貨類 -> ['貿易百貨類']
可能對應: 電子工業類 -> ['電子工業類']
可能對應: 鋼鐵類 -> ['鋼鐵類']
可能對應: 電機機械類 -> ['電機機械類']
可能對應: 電腦及週邊設備類 -> ['電腦及週邊設備類']
可能對應: 化學生技醫療類 -> ['化學生技醫療類']
可能對應: 化學類 -> ['化學生技醫療類', '化學類']
可能對應: 其他電子類 -> ['其他電子類']
可能對應: 玻璃陶瓷類 -> ['玻璃陶瓷類']
可能對應: 造紙類 -> ['造紙類']
可能對應: 橡膠類 -> ['橡膠類']
可能對應: 航運類 -> ['航運類']
可能對應: 半導體類 -> ['半導體類']
可能對應: 通信網路類 -> ['通信網路類']
可能對應: 光電類 -> ['光電類']
可能對應: 資訊服務類 -> ['資訊服務類']
可能對應: 油電燃氣類 -> ['油電燃氣類']
可能對應: 運動休閒類 -> ['運動休閒類']
可能對應: 金融保險類 -> ['金融保險類']
可能對應: 居家生活類 -> ['居家生活類']
可能對應: 數位雲端類 -> ['數位雲端類']
可能對應: 電子通路類 -> [


請查看產業對應分析結果，按 Enter 繼續... 


需要確認產業對應關係的項目:
['電子工業類']
請檢查這些產業是否應該對應到現有的產業指數


所有前置檢查通過，可以開始生成特徵
前置檢查結果: 成功


In [23]:

# 讀取industry_index.csv
industry_index_path = r"D:\Min\Python\Project\FA_Data\meta_data\companies.csv"
industry_df = pd.read_csv(industry_index_path)

# 顯示所有唯一的產業指數名稱
print("companies_final中的產業類別:")
unique_industries = industry_df['industry_category'].unique()
print("\n".join(sorted(unique_industries)))

# 計算產業指數數量
print(f"\n總共有 {len(unique_industries)} 個產業指數")

companies_final中的產業類別:
ETF
ETN
Index
上櫃指數股票型基金(ETF)
光電業
其他
其他電子業
其他電子類
創新板股票
創新版股票
化學工業
化學生技醫療
半導體業
受益證券
塑膠工業
大盤
存託憑證
居家生活
居家生活類
建材營造
所有證券
指數投資證券(ETN)
數位雲端
數位雲端類
文化創意業
橡膠工業
水泥工業
汽車工業
油電燃氣業
玻璃陶瓷
生技醫療業
紡織纖維
綠能環保
綠能環保類
航運業
觀光事業
觀光餐旅
貿易百貨
資訊服務業
農業科技
農業科技業
通信網路業
造紙工業
運動休閒
運動休閒類
金融保險
金融業
鋼鐵工業
電器電纜
電子商務業
電子工業
電子通路業
電子零組件業
電機機械
電腦及週邊設備業
食品工業

總共有 56 個產業指數


In [26]:
# 讀取industry_index.csv
industry_index_path = r"D:\Min\Python\Project\FA_Data\meta_data\industry_index.csv"
industry_df = pd.read_csv(industry_index_path)

# 顯示所有唯一的產業指數名稱
print("Industry Index中的產業類別:")
unique_industries = industry_df['指數名稱'].unique()
print("\n".join(sorted(unique_industries)))

# 計算產業指數數量
print(f"\n總共有 {len(unique_industries)} 個產業指數")

Industry Index中的產業類別:
光電類報酬指數
光電類指數
其他電子類報酬指數
其他電子類指數
其他類報酬指數
其他類指數
化學生技醫療類報酬指數
化學生技醫療類指數
化學類報酬指數
化學類指數
半導體類報酬指數
半導體類指數
塑膠化工類指數
塑膠類報酬指數
塑膠類指數
居家生活類報酬指數
居家生活類指數
建材營造類報酬指數
建材營造類指數
數位雲端類報酬指數
數位雲端類指數
機電類指數
橡膠類報酬指數
橡膠類指數
水泥窯製類指數
水泥類報酬指數
水泥類指數
汽車類報酬指數
汽車類指數
油電燃氣類報酬指數
油電燃氣類指數
玻璃陶瓷類報酬指數
玻璃陶瓷類指數
生技醫療類報酬指數
生技醫療類指數
紡織纖維類報酬指數
紡織纖維類指數
綠能環保類報酬指數
綠能環保類指數
航運類報酬指數
航運類指數
觀光餐旅類報酬指數
觀光餐旅類指數
貿易百貨類報酬指數
貿易百貨類指數
資訊服務類報酬指數
資訊服務類指數
通信網路類報酬指數
通信網路類指數
造紙類報酬指數
造紙類指數
運動休閒類報酬指數
運動休閒類指數
金融保險類報酬指數
金融保險類指數
金融類日報酬兩倍指數
金融類日報酬反向一倍指數
鋼鐵類報酬指數
鋼鐵類指數
電器電纜類報酬指數
電器電纜類指數
電子工業類報酬指數
電子工業類指數
電子通路類報酬指數
電子通路類指數
電子零組件類報酬指數
電子零組件類指數
電子類兩倍槓桿指數
電子類反向指數
電機機械類報酬指數
電機機械類指數
電腦及週邊設備類報酬指數
電腦及週邊設備類指數
食品類報酬指數
食品類指數

總共有 75 個產業指數


In [24]:
import os
import time
import logging
import pandas as pd
import numpy as np
import requests
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict
from tqdm import tqdm

class TWMarketDataProcessor:
    def __init__(self, date_str=None):
        self.date_str = date_str or datetime.now().strftime('%Y%m%d')
        self.logger = logging.getLogger(__name__)
        self.base_path = Path("D:/Min/Python/Project/FA_Data")
        self.meta_data_path = self.base_path / "meta_data"
        
        # 確保目錄存在
        self.meta_data_path.mkdir(parents=True, exist_ok=True)
        
        # 設定日誌
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('market_data_process.log', encoding='utf-8'),
                logging.StreamHandler()
            ]
        )

    def _make_request(self, url: str) -> Optional[requests.Response]:
        """發送HTTP請求並處理重試邏輯"""
        max_retries = 3
        retry_delay = 5  # 秒
        
        for attempt in range(max_retries):
            try:
                response = requests.get(url, timeout=30)
                if response.status_code == 200:
                    return response
                elif response.status_code == 429:  # Too Many Requests
                    time.sleep(retry_delay * (attempt + 1))
                    continue
            except requests.RequestException as e:
                self.logger.warning(f"請求失敗 (嘗試 {attempt + 1}/{max_retries}): {str(e)}")
                if attempt < max_retries - 1:
                    time.sleep(retry_delay)
                continue
        
        return None

    def extract_industry_category_data_for_date(self, date_str: str) -> Optional[List[Dict]]:
        """擷取特定日期的產業類股指數資料（包含價格指數和報酬指數）"""
        url = f'https://www.twse.com.tw/rwd/zh/afterTrading/MI_INDEX?date={date_str}&type=ALL&response=json'
        
        response = self._make_request(url)
        if response is None:
            return None
    
        try:
            data = response.json()
            
            if 'tables' not in data or not data['tables']:
                self.logger.warning(f"日期 {date_str} 未擷取到任何產業指數")
                return None
    
            index_data = []
            # 尋找包含產業類指數的表格
            for table in data['tables']:
                if '價格指數' in table.get('title', '') or '報酬指數' in table.get('title', ''):
                    for row in table['data']:
                        name = row[0].strip()
                        # 處理包含「類指數」和「類報酬指數」
                        if '類' in name and ('指數' in name or '類報酬' in name):
                            try:
                                # 處理數值
                                close_price = float(str(row[1]).replace(',', ''))
                                change = row[2].replace('<p style =\'color:red\'>+</p>', '+').replace('<p style =\'color:green\'>-</p>', '-')
                                change_price = float(str(row[3]).replace(',', '')) if row[3] != '--' else 0.0
                                change_percent = float(str(row[4]).replace(',', '')) if row[4] != '--' else 0.0
                                
                                index_data.append({
                                    '指數名稱': name,
                                    '收盤指數': close_price,
                                    '漲跌': change,
                                    '漲跌點數': change_price,
                                    '漲跌百分比': change_percent,
                                    '日期': datetime.strptime(date_str, '%Y%m%d').strftime('%Y-%m-%d')
                                })
                            except (ValueError, IndexError) as e:
                                self.logger.warning(f"處理產業指數行資料時發生錯誤: {str(e)}, Row: {row}")
                                continue
    
            if index_data:
                self.logger.debug(f"日期 {date_str} 抓取到 {len(index_data)} 個類股指數")
                self.logger.info(f"已取得 {len([x for x in index_data if '報酬' in x['指數名稱']])} 個類報酬指數")
                self.logger.info(f"已取得 {len([x for x in index_data if '報酬' not in x['指數名稱']])} 個類指數")
            return index_data
    
        except Exception as e:
            self.logger.error(f"處理 {date_str} 時發生錯誤: {str(e)}")
            return None

    def process_industry_category_data(self) -> Optional[pd.DataFrame]:
        """處理產業類股指數數據"""
        try:
            self.logger.info(f"開始處理產業類股指數數據: {self.date_str}")
            
            # 設定檔案路徑
            industry_category_file = self.meta_data_path / 'industry_index.csv'
            
            # 讀取現有數據
            existing_df = pd.DataFrame()
            existing_dates = set()
            if industry_category_file.exists():
                existing_df = pd.read_csv(industry_category_file)
                existing_dates = set(existing_df['日期'].unique())
                self.logger.info(f"已讀取現有數據，共 {len(existing_df)} 筆記錄")
    
            # 直接使用 self.date_str
            index_data = self.extract_industry_category_data_for_date(self.date_str)
            
            if not index_data:
                self.logger.info("沒有新的產業類股指數數據需要處理")
                return existing_df if not existing_df.empty else None
    
            # 處理新數據
            new_df = pd.DataFrame(index_data)
            if not existing_df.empty:
                df = pd.concat([existing_df, new_df], ignore_index=True)
                df = df.drop_duplicates(subset=['日期', '指數名稱'], keep='last')
            else:
                df = new_df
    
            # 排序和保存
            df = df.sort_values(['指數名稱', '日期'])
            df.to_csv(industry_category_file, index=False, encoding='utf-8-sig')
            
            self.logger.info(f"產業類股指數數據已更新並保存到: {industry_category_file}")
            return df
                
        except Exception as e:
            self.logger.error(f"處理產業類股指數數據時發生錯誤: {str(e)}")
            raise

# 主程式執行
if __name__ == "__main__":
    try:
        processor = TWMarketDataProcessor(date_str='20241122')  # 指定要查詢的日期
        df = processor.process_industry_category_data()
        if df is not None:
            print(f"成功處理產業類股指數數據，共 {len(df)} 筆記錄")
        else:
            print("沒有新的數據需要處理")
    except Exception as e:
        print(f"執行過程中發生錯誤: {str(e)}")

2024-11-25 14:18:11,489 - INFO - 開始處理產業類股指數數據: 20241122
2024-11-25 14:18:20,186 - INFO - 已取得 36 個類報酬指數
2024-11-25 14:18:20,187 - INFO - 已取得 39 個類指數
2024-11-25 14:18:20,206 - INFO - 產業類股指數數據已更新並保存到: D:\Min\Python\Project\FA_Data\meta_data\industry_index.csv


成功處理產業類股指數數據，共 75 筆記錄


In [None]:
def fix_price_change_column(file_path):
    """修復既有檔案中的漲跌符號"""
    df = pd.read_csv(file_path, dtype={'證券代號': str}, low_memory=False)
    
    if '漲跌(+/-)' in df.columns:
        df['漲跌(+/-)'] = df['漲跌(+/-)'].apply(lambda x: '+' if 'color:red' in str(x) else 
                                                       '-' if 'color:green' in str(x) else 
                                                       '')
    
    df.to_csv(file_path, index=False, encoding='utf-8-sig')

fix_price_change_column('D:/Min/Python/Project/FA_Data/meta_data/stock_data_whole.csv')

In [None]:
# 檢查證交所資料結構
import requests
import json

def check_twse_data_structure():
    url = 'https://www.twse.com.tw/rwd/zh/afterTrading/MI_INDEX?date=20241205&type=ALL&response=json'
    
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        
        # 印出整體結構
        print("=== 資料結構 ===")
        for key in data:
            if key == 'tables':
                print(f"\n資料表數量: {len(data['tables'])}")
                print("\n每個資料表的標題:")
                for i, table in enumerate(data['tables']):
                    print(f"第 {i} 個資料表: {table.get('title', '無標題')}")
                
                # 特別檢查第8個資料表（應該是個股資料）
                if len(data['tables']) > 8:
                    stock_table = data['tables'][8]
                    print(f"\n=== 第8個資料表詳細資訊 ===")
                    print(f"標題: {stock_table.get('title', '無標題')}")
                    print(f"欄位: {stock_table.get('fields', [])}")
                    print("\n前三筆資料範例:")
                    for row in stock_table.get('data', [])[:3]:
                        print(row)
            else:
                print(f"{key}: {data[key]}")
    else:
        print(f"請求失敗: {response.status_code}")

check_twse_data_structure()

In [4]:
import pandas as pd
from pathlib import Path
from datetime import datetime
import numpy as np

def check_stock_update_dates():
    """檢查每支股票的最新更新日期"""
    try:
        # 設定檔案路徑
        base_path = Path("D:/Min/Python/Project/FA_Data")
        file_path = base_path / 'meta_data' / 'stock_data_whole.csv'
        
        # 讀取股票資料
        df = pd.read_csv(file_path, dtype={'證券代號': str})
        
        # 將日期轉換為datetime格式
        df['日期'] = pd.to_datetime(df['日期'])
        
        # 獲取每支股票的最新日期
        latest_dates = df.groupby('證券代號').agg({
            '日期': 'max',
            '證券名稱': 'first'
        }).reset_index()
        
        # 計算與最新日期的差距
        overall_latest = latest_dates['日期'].max()
        latest_dates['差距天數'] = (overall_latest - latest_dates['日期']).dt.days
        
        # 排序並輸出結果
        latest_dates = latest_dates.sort_values('差距天數', ascending=False)
        
        # 生成報告
        print(f"檢查時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"資料庫最新日期: {overall_latest.strftime('%Y-%m-%d')}")
        print(f"總股票數量: {len(latest_dates)}")
        
        # 輸出未更新天數超過5天的股票
        outdated = latest_dates[latest_dates['差距天數'] > 5]
        if not outdated.empty:
            print("\n更新落後超過5天的股票:")
            for _, row in outdated.iterrows():
                print(f"股票代號: {row['證券代號']}, "
                      f"名稱: {row['證券名稱']}, "
                      f"最後更新: {row['日期'].strftime('%Y-%m-%d')}, "
                      f"落後天數: {row['差距天數']}天")
        
        # 基本統計信息
        print(f"\n基本統計:")
        print(f"平均落後天數: {latest_dates['差距天數'].mean():.1f}天")
        print(f"最大落後天數: {latest_dates['差距天數'].max()}天")
        print(f"完全更新到最新日期的股票數: {len(latest_dates[latest_dates['差距天數'] == 0])}")
        
        return latest_dates
        
    except Exception as e:
        print(f"檢查過程中發生錯誤: {str(e)}")
        return None

# 執行檢查
result_df = check_stock_update_dates()

檢查時間: 2024-12-11 13:56:51
資料庫最新日期: 2024-12-11
總股票數量: 1154

更新落後超過5天的股票:
股票代號: 911201, 名稱: 僑威控, 最後更新: 2014-05-05, 落後天數: 3873天
股票代號: 910948, 名稱: 融達, 最後更新: 2014-07-03, 落後天數: 3814天
股票代號: 2837, 名稱: 萬泰銀, 最後更新: 2014-09-01, 落後天數: 3754天
股票代號: 3638, 名稱: F-IML, 最後更新: 2014-09-09, 落後天數: 3746天
股票代號: 2384, 名稱: 勝華, 最後更新: 2014-11-18, 落後天數: 3676天
股票代號: 5280, 名稱: F-敦泰, 最後更新: 2014-12-19, 落後天數: 3645天
股票代號: 3061, 名稱: 璨圓, 最後更新: 2014-12-23, 落後天數: 3641天
股票代號: 910069, 名稱: 新曄, 最後更新: 2015-05-12, 落後天數: 3501天
股票代號: 911609, 名稱: 揚子江, 最後更新: 2015-06-16, 落後天數: 3466天
股票代號: 2833A, 名稱: 台壽甲, 最後更新: 2015-10-01, 落後天數: 3359天
股票代號: 2833, 名稱: 台壽保, 最後更新: 2015-10-01, 落後天數: 3359天
股票代號: 2361, 名稱: 鴻友, 最後更新: 2015-12-25, 落後天數: 3274天
股票代號: 913889, 名稱: 大成糖, 最後更新: 2015-12-28, 落後天數: 3271天
股票代號: 2891A, 名稱: 中信特, 最後更新: 2016-01-18, 落後天數: 3250天
股票代號: 911612, 名稱: 滬安, 最後更新: 2016-01-25, 落後天數: 3243天
股票代號: 2847, 名稱: 大眾銀, 最後更新: 2016-03-09, 落後天數: 3199天
股票代號: 3584, 名稱: 介面, 最後更新: 2016-04-06, 落後天數: 3171天
股票代號: 6286, 名稱: 立錡, 最後更新: 2016-04-20, 落後天數: 3157天
股

In [6]:
import pandas as pd
from pathlib import Path
from datetime import datetime, timedelta
import numpy as np

def detailed_stock_analysis():
    """詳細的股票資料分析"""
    try:
        base_path = Path("D:/Min/Python/Project/FA_Data")
        stock_file = base_path / 'meta_data' / 'stock_data_whole.csv'
        tech_path = base_path / 'technical_analysis'
        
        # 讀取股票資料
        df = pd.read_csv(stock_file, dtype={
            '證券代號': str,
            '證券名稱': str,
            '日期': str
        }, low_memory=False)
        
        # 1. 檢查資料量少的股票
        stock_counts = df.groupby(['證券代號', '證券名稱']).agg({
            '日期': ['count', 'min', 'max']
        }).reset_index()
        stock_counts.columns = ['證券代號', '證券名稱', '資料筆數', '開始日期', '最後日期']
        low_data_stocks = stock_counts[stock_counts['資料筆數'] < 50].sort_values('資料筆數')
        
        print("\n=== 資料筆數少於50的股票 ===")
        print(f"共有 {len(low_data_stocks)} 支股票")
        for _, row in low_data_stocks.iterrows():
            print(f"代號: {row['證券代號']}, 名稱: {row['證券名稱']}, "
                  f"資料筆數: {row['資料筆數']}, "
                  f"資料區間: {row['開始日期']} 到 {row['最後日期']}")
        
        # 2. 檢查技術分析檔案
        print("\n=== 技術分析檔案檢查 ===")
        if not tech_path.exists():
            print("technical_analysis 目錄不存在")
            return
            
        tech_files = list(tech_path.glob('*_indicators.csv'))
        print(f"技術分析檔案總數: {len(tech_files)}")
        
        # 檢查技術分析檔案的更新狀態
        tech_dates = []
        for file in tech_files[:5]:  # 檢查前5個檔案作為範例
            try:
                tech_df = pd.read_csv(file)
                stock_id = file.stem.split('_')[0]
                latest_date = tech_df['日期'].max()
                tech_dates.append({
                    '股票代號': stock_id,
                    '最新日期': latest_date,
                    '資料筆數': len(tech_df)
                })
            except Exception as e:
                print(f"處理檔案 {file.name} 時發生錯誤: {str(e)}")
        
        if tech_dates:
            print("\n技術分析檔案範例:")
            for info in tech_dates:
                print(f"股票 {info['股票代號']}: 最新日期 {info['最新日期']}, 資料筆數 {info['資料筆數']}")
        
        return low_data_stocks, tech_dates
        
    except Exception as e:
        print(f"分析過程中發生錯誤: {str(e)}")
        return None, None

# 執行詳細分析
low_data_stocks, tech_dates = detailed_stock_analysis()


=== 資料筆數少於50的股票 ===
共有 14 支股票
代號: 8045, 名稱: 達運光電, 資料筆數: 1, 資料區間: 2024-12-11 到 2024-12-11
代號: 6955, 名稱: 邦睿生技-創, 資料筆數: 4, 資料區間: 2024-12-06 到 2024-12-11
代號: 7722, 名稱: LINEPAY, 資料筆數: 5, 資料區間: 2024-12-05 到 2024-12-11
代號: 6924, 名稱: 榮惠-KY創, 資料筆數: 7, 資料區間: 2024-12-03 到 2024-12-11
代號: 6757, 名稱: 台灣虎航, 資料筆數: 12, 資料區間: 2024-11-25 到 2024-12-11
代號: 6962, 名稱: ITH-KY, 資料筆數: 12, 資料區間: 2024-11-26 到 2024-12-11
代號: 7705, 名稱: 三商餐飲, 資料筆數: 12, 資料區間: 2024-11-26 到 2024-12-11
代號: 911201, 名稱: 僑威控, 資料筆數: 20, 資料區間: 2014-04-07 到 2014-05-05
代號: 8476, 名稱: 台境*, 資料筆數: 23, 資料區間: 2024-11-11 到 2024-12-11
代號: 2646, 名稱: 星宇航空, 資料筆數: 33, 資料區間: 2024-10-25 到 2024-12-11
代號: 6862, 名稱: 三集瑞-KY, 資料筆數: 35, 資料區間: 2024-10-23 到 2024-12-11
代號: 6988, 名稱: 威力暘-創, 資料筆數: 35, 資料區間: 2024-10-23 到 2024-12-11
代號: 6919, 名稱: 康霈*, 資料筆數: 47, 資料區間: 2024-10-04 到 2024-12-11
代號: 2897B, 名稱: 王道銀乙特, 資料筆數: 49, 資料區間: 2024-09-30 到 2024-12-11

=== 技術分析檔案檢查 ===
技術分析檔案總數: 1134

技術分析檔案範例:
股票 1101B: 最新日期 2024-11-27, 資料筆數 1415
股票 1101: 最新日期 2024-12-10, 資料筆數 2611
股票 

In [5]:
import pandas as pd
from pathlib import Path
from datetime import datetime, timedelta
import logging

def diagnose_stock_data():
    """診斷股票資料狀態"""
    # 設定日誌
    logging.basicConfig(level=logging.INFO,
                       format='%(asctime)s - %(levelname)s - %(message)s')
    
    try:ㄎ
        # 1. 檢查路徑與檔案
        base_path = Path("D:/Min/Python/Project/FA_Data")
        meta_data_path = base_path / 'meta_data'
        files_to_check = {
            '大盤指數': meta_data_path / 'market_idex.csv',
            '產業指數': meta_data_path / 'industry_index.csv',
            '股票資料': meta_data_path / 'stock_data_whole.csv'
        }
        
        logging.info("開始診斷股票資料系統")
        logging.info(f"基礎路徑: {base_path}")
        
        # 2. 檢查檔案存在性和大小
        for name, file_path in files_to_check.items():
            if file_path.exists():
                size_mb = file_path.stat().st_size / (1024 * 1024)
                logging.info(f"{name}檔案存在，大小: {size_mb:.2f} MB")
                
                # 讀取並檢查資料
                if name == '大盤指數':
                    df = pd.read_csv(file_path)
                    logging.info(f"{name}最新日期: {df['Date'].max()}")
                    logging.info(f"{name}資料筆數: {len(df)}")
                else:
                    df = pd.read_csv(file_path, dtype={'證券代號': str} if name == '股票資料' else None)
                    logging.info(f"{name}最新日期: {df['日期'].max()}")
                    logging.info(f"{name}資料筆數: {len(df)}")
                    
                    if name == '股票資料':
                        # 檢查股票數據的完整性
                        stock_counts = df.groupby('證券代號').size()
                        logging.info(f"股票總數: {len(stock_counts)}")
                        logging.info(f"每支股票平均資料筆數: {stock_counts.mean():.2f}")
                        logging.info(f"資料筆數最少的股票數: {len(stock_counts[stock_counts < 50])}")
            else:
                logging.error(f"{name}檔案不存在: {file_path}")
        
        # 3. 建議更新方式
        logging.info("\n建議更新方式：")
        logging.info("1. 先執行個股數據更新：")
        logging.info("   processor = TWMarketDataProcessor()")
        logging.info("   processor.process_daily_stock_data()")
        logging.info("2. 再更新產業指數：")
        logging.info("   processor.process_industry_index_data()")
        logging.info("3. 最後更新大盤指數：")
        logging.info("   processor.update_market_index()")
        
    except Exception as e:
        logging.error(f"診斷過程發生錯誤: {str(e)}")

def update_stock_data():
    """分步驟更新股票資料"""
    try:
        # 建立處理器
        processor = TWMarketDataProcessor(date_range=MarketDateRange.last_n_days(100))
        
        # 1. 更新個股資料
        logging.info("開始更新個股資料...")
        stock_data = processor.process_daily_stock_data()
        if stock_data is not None:
            logging.info(f"個股資料更新完成，最新日期: {stock_data['日期'].max()}")
        
        # 2. 更新產業指數
        logging.info("開始更新產業指數...")
        industry_data = processor.process_industry_index_data()
        if industry_data is not None:
            logging.info(f"產業指數更新完成，最新日期: {industry_data['日期'].max()}")
        
        # 3. 更新大盤指數
        logging.info("開始更新大盤指數...")
        market_data = processor.update_market_index()
        if market_data is not None:
            logging.info(f"大盤指數更新完成，最新日期: {market_data.index.max()}")
            
        logging.info("所有更新完成")
        
    except Exception as e:
        logging.error(f"更新過程發生錯誤: {str(e)}")

# 執行診斷
diagnose_stock_data()

2024-12-10 14:25:51,595 - INFO - 開始診斷股票資料系統
2024-12-10 14:25:51,595 - INFO - 基礎路徑: D:\Min\Python\Project\FA_Data
2024-12-10 14:25:51,596 - INFO - 大盤指數檔案存在，大小: 0.25 MB
2024-12-10 14:25:51,628 - INFO - 大盤指數最新日期: 2024-12-09
2024-12-10 14:25:51,629 - INFO - 大盤指數資料筆數: 2662
2024-12-10 14:25:51,630 - INFO - 產業指數檔案存在，大小: 2.53 MB
2024-12-10 14:25:51,741 - INFO - 產業指數最新日期: 2024-12-10
2024-12-10 14:25:51,742 - INFO - 產業指數資料筆數: 48317
2024-12-10 14:25:51,743 - INFO - 股票資料檔案存在，大小: 223.11 MB
  df = pd.read_csv(file_path, dtype={'證券代號': str} if name == '股票資料' else None)
2024-12-10 14:25:57,065 - INFO - 股票資料最新日期: 2024-12-10
2024-12-10 14:25:57,066 - INFO - 股票資料資料筆數: 2191428
2024-12-10 14:25:57,159 - INFO - 股票總數: 1112
2024-12-10 14:25:57,160 - INFO - 每支股票平均資料筆數: 1970.71
2024-12-10 14:25:57,162 - INFO - 資料筆數最少的股票數: 128
2024-12-10 14:25:57,162 - INFO - 
建議更新方式：
2024-12-10 14:25:57,163 - INFO - 1. 先執行個股數據更新：
2024-12-10 14:25:57,163 - INFO -    processor = TWMarketDataProcessor()
2024-12-10 14:25:57,164 - I

In [3]:
import pandas as pd
import os
from datetime import datetime
from pathlib import Path
import logging

# 設定日誌
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def integrate_daily_data():
    """整合daily_price資料夾中的所有數據到stock_data_whole"""
    try:
        # 設定路徑
        base_path = Path("D:/Min/Python/Project/FA_Data")
        daily_price_path = base_path / "daily_price"
        stock_data_path = base_path / "meta_data/stock_data_whole.csv"
        backup_path = base_path / "meta_data/backup"
        
        # 確保備份目錄存在
        backup_path.mkdir(parents=True, exist_ok=True)
        
        # 讀取現有的stock_data_whole
        logger.info("讀取現有的stock_data_whole檔案")
        try:
            existing_df = pd.read_csv(
                stock_data_path, 
                dtype={'證券代號': str, '證券名稱': str},
                low_memory=False
            )
            existing_df['日期'] = pd.to_datetime(existing_df['日期']).dt.strftime('%Y-%m-%d')
            existing_dates = set(existing_df['日期'].unique())
            logger.info(f"現有資料日期範圍: {min(existing_dates)} 到 {max(existing_dates)}")
        except FileNotFoundError:
            existing_df = pd.DataFrame()
            existing_dates = set()
            logger.info("未找到現有的stock_data_whole檔案，將創建新檔案")
        
        # 備份現有檔案
        if stock_data_path.exists():
            backup_time = datetime.now().strftime('%Y%m%d_%H%M%S')
            backup_file = backup_path / f"stock_data_whole_{backup_time}.csv"
            existing_df.to_csv(backup_file, index=False, encoding='utf-8-sig')
            logger.info(f"已建立備份檔案: {backup_file}")
        
        # 讀取daily_price資料夾中的所有檔案
        new_data_list = []
        logger.info("開始處理daily_price資料夾中的檔案")
        
        for file_name in os.listdir(daily_price_path):
            if file_name.endswith('.csv'):
                file_path = daily_price_path / file_name
                try:
                    # 從檔名獲取日期
                    date_str = datetime.strptime(
                        file_name.split('.')[0], 
                        "%Y%m%d"
                    ).strftime("%Y-%m-%d")
                    
                    # 讀取數據
                    df = pd.read_csv(file_path)
                    df['日期'] = date_str
                    df['證券代號'] = df['證券代號'].astype(str)
                    
                    # 處理數值欄位
                    numeric_columns = ['成交股數', '成交筆數', '成交金額', '開盤價', 
                                     '最高價', '最低價', '收盤價', '漲跌價差', 
                                     '最後揭示買價', '最後揭示買量', 
                                     '最後揭示賣價', '最後揭示賣量', '本益比']
                    
                    for col in numeric_columns:
                        if col in df.columns:
                            df[col] = df[col].replace({'--': None, '': None})
                            df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', ''), 
                                                  errors='coerce')
                    
                    new_data_list.append(df)
                    logger.info(f"成功處理 {file_name}")
                    
                except Exception as e:
                    logger.error(f"處理 {file_name} 時發生錯誤: {str(e)}")
        
        # 合併所有新數據
        if new_data_list:
            logger.info("合併所有新數據")
            new_data = pd.concat(new_data_list, ignore_index=True)
            
            # 合併新舊數據
            if not existing_df.empty:
                logger.info("合併新舊數據")
                # 移除重複的日期資料
                existing_df = existing_df[~existing_df['日期'].isin(new_data['日期'].unique())]
                final_df = pd.concat([existing_df, new_data], ignore_index=True)
            else:
                final_df = new_data
            
            # 排序
            final_df = final_df.sort_values(['證券代號', '日期'])
            
            # 保存結果
            final_df.to_csv(stock_data_path, index=False, encoding='utf-8-sig')
            
            logger.info("資料整合完成")
            logger.info(f"最終資料日期範圍: {final_df['日期'].min()} 到 {final_df['日期'].max()}")
            logger.info(f"總資料筆數: {len(final_df):,d}")
            logger.info(f"股票數量: {len(final_df['證券代號'].unique()):,d}")
            
        else:
            logger.warning("未發現新的資料需要整合")
            
    except Exception as e:
        logger.error(f"整合過程發生錯誤: {str(e)}")

if __name__ == "__main__":
    integrate_daily_data()

2024-12-11 13:56:36,889 - INFO - 資料整合完成
2024-12-11 13:56:37,521 - INFO - 最終資料日期範圍: 2014-04-07 到 2024-12-11
2024-12-11 13:56:37,523 - INFO - 總資料筆數: 2,498,368
2024-12-11 13:56:38,061 - INFO - 股票數量: 1,154


In [4]:
def check_raw_data(base_path="D:/Min/Python/Project/FA_Data"):
    """
    檢查原始資料的完整性
    
    Args:
        base_path: 基礎資料路徑，預設為"D:/Min/Python/Project/FA_Data"
    """
    import pandas as pd
    from pathlib import Path
    import os
    
    # 先檢查路徑是否存在
    base_path = Path(base_path)
    if not base_path.exists():
        print(f"錯誤: 基礎路徑 {base_path} 不存在")
        return
        
    print(f"使用基礎路徑: {base_path}")
    
    # 檢查必要的子目錄
    required_dirs = {
        "technical_analysis": base_path / "technical_analysis",
        "meta_data": base_path / "meta_data"
    }
    
    print("\n=== 目錄檢查 ===")
    for dir_name, dir_path in required_dirs.items():
        if dir_path.exists():
            print(f"{dir_name}: 存在 ({dir_path})")
        else:
            print(f"{dir_name}: 不存在 ({dir_path})")
            
    # 原本的檢查邏輯...
    test_stocks = ['2330', '2317', '1101', '2891', '2303']
    
    print("\n=== 開始檢查資料完整性 ===")
    
    # 1. 技術指標檢查
    tech_path = required_dirs["technical_analysis"]
    if not tech_path.exists():
        print("\n1. 技術指標目錄不存在")
    else:
        print("\n1. 技術指標檢查:")
        for stock_id in test_stocks:
            file_path = tech_path / f"{stock_id}_indicators.csv"
            print(f"\n股票 {stock_id}:")
            if not file_path.exists():
                print(f"  - 找不到技術指標檔案 ({file_path})")
            else:
                try:
                    df = pd.read_csv(file_path)
                    print(f"  - 檔案大小: {os.path.getsize(file_path) / 1024:.2f} KB")
                    print(f"  - 資料筆數: {len(df)}")
                    print(f"  - 欄位數量: {len(df.columns)}")
                    print("  - 欄位檢查:")
                    for col in df.columns:
                        valid_ratio = df[col].notna().mean() * 100
                        print(f"    * {col}: {valid_ratio:.2f}% 有效資料")
                except Exception as e:
                    print(f"  - 讀取錯誤: {str(e)}")
    
    # 2. 產業指數檢查
    meta_path = required_dirs["meta_data"]
    industry_file = meta_path / "industry_index.csv"
    print("\n2. 產業指數檢查:")
    if not meta_path.exists():
        print("meta_data 目錄不存在")
    elif not industry_file.exists():
        print(f"找不到產業指數檔案 ({industry_file})")
    else:
        try:
            df = pd.read_csv(industry_file)
            print(f"  - 檔案大小: {os.path.getsize(industry_file) / 1024:.2f} KB")
            print(f"  - 資料筆數: {len(df)}")
            print(f"  - 欄位列表: {', '.join(df.columns)}")
            for col in df.columns:
                valid_ratio = df[col].notna().mean() * 100
                print(f"  - {col}: {valid_ratio:.2f}% 有效資料")
        except Exception as e:
            print(f"  - 讀取錯誤: {str(e)}")

In [5]:
# 使用預設路徑
check_raw_data()

# 或指定自訂路徑
check_raw_data("D:/Min/Python/Project/FA_Data")

使用基礎路徑: D:\Min\Python\Project\FA_Data

=== 目錄檢查 ===
technical_analysis: 存在 (D:\Min\Python\Project\FA_Data\technical_analysis)
meta_data: 存在 (D:\Min\Python\Project\FA_Data\meta_data)

=== 開始檢查資料完整性 ===

1. 技術指標檢查:

股票 2330:
  - 檔案大小: 831.86 KB
  - 資料筆數: 2627
  - 欄位數量: 29
  - 欄位檢查:
    * 證券代號: 100.00% 有效資料
    * 證券名稱: 100.00% 有效資料
    * 成交股數: 100.00% 有效資料
    * 成交筆數: 100.00% 有效資料
    * 成交金額: 100.00% 有效資料
    * 開盤價: 100.00% 有效資料
    * 最高價: 100.00% 有效資料
    * 最低價: 100.00% 有效資料
    * 收盤價: 100.00% 有效資料
    * 漲跌(+/-): 100.00% 有效資料
    * 漲跌價差: 100.00% 有效資料
    * 最後揭示買價: 100.00% 有效資料
    * 最後揭示買量: 100.00% 有效資料
    * 最後揭示賣價: 100.00% 有效資料
    * 最後揭示賣量: 100.00% 有效資料
    * 本益比: 100.00% 有效資料
    * 日期: 100.00% 有效資料
    * SMA30: 100.00% 有效資料
    * DEMA30: 100.00% 有效資料
    * EMA30: 100.00% 有效資料
    * RSI: 100.00% 有效資料
    * MACD: 100.00% 有效資料
    * MACD_signal: 100.00% 有效資料
    * MACD_hist: 100.00% 有效資料
    * slowk: 100.00% 有效資料
    * slowd: 100.00% 有效資料
    * TSF: 100.00% 有效資料
    * middleband: 100.00

In [6]:
import os
import pandas as pd
import json
from pathlib import Path
from datetime import datetime

class IndustryDataValidator:
    def __init__(self, base_dir="D:/Min/Python/Project/FA_Data"):
        self.base_dir = Path(base_dir)
        self.meta_data_dir = self.base_dir / "meta_data"
        self.industry_analysis_dir = self.base_dir / "industry_analysis"
        self.industry_correlation_dir = self.base_dir / "industry_correlation"
        
    def validate_all(self):
        print("開始驗證產業相關資料...")
        print("\n1. 檢查必要檔案存在性:")
        self._check_required_files()
        
        print("\n2. 驗證產業分類對應:")
        self._validate_industry_mapping()
        
        print("\n3. 檢查產業指數資料:")
        self._validate_industry_index()
        
        print("\n4. 檢查產業分析檔案:")
        self._check_industry_analysis_files()
        
    def _check_required_files(self):
        required_files = {
            "companies_final.csv": self.meta_data_dir,
            "industry_index.csv": self.meta_data_dir,
        }
        
        for file, directory in required_files.items():
            path = directory / file
            if path.exists():
                file_size = path.stat().st_size / (1024 * 1024)  # Convert to MB
                print(f"✓ {file} 存在 (大小: {file_size:.2f}MB)")
                
                # 讀取檔案前幾行以驗證格式
                try:
                    df = pd.read_csv(path, nrows=5)
                    print(f"  欄位: {', '.join(df.columns)}")
                except Exception as e:
                    print(f"  ⚠️ 檔案格式可能有問題: {str(e)}")
            else:
                print(f"✗ {file} 不存在於 {directory}")
                
    def _validate_industry_mapping(self):
        try:
            # 讀取產業分類對應
            mapping_file = self.meta_data_dir / "companies_final.csv"
            df = pd.read_csv(mapping_file, dtype={'stock_id': str})
            
            # 基本統計
            total_stocks = len(df)
            unique_industries = df['industry_category'].unique()
            
            print(f"總股票數: {total_stocks}")
            print(f"產業類別數: {len(unique_industries)}")
            print("\n產業分布:")
            industry_dist = df['industry_category'].value_counts()
            for ind, count in industry_dist.items():
                print(f"{ind}: {count} 檔股票 ({count/total_stocks*100:.1f}%)")
                
            # 檢查測試標的的產業分類
            test_stocks = ['2330', '2317', '1101', '2891', '2303']
            print("\n測試股票的產業分類:")
            test_stocks_df = df[df['stock_id'].isin(test_stocks)]
            for _, row in test_stocks_df.iterrows():
                print(f"{row['stock_id']}: {row['industry_category']}")
                
        except Exception as e:
            print(f"驗證產業分類對應時發生錯誤: {str(e)}")
            
    def _validate_industry_index(self):
        try:
            index_file = self.meta_data_dir / "industry_index.csv"
            df = pd.read_csv(index_file)
            
            # 基本資訊
            print(f"資料期間: {df['日期'].min()} 到 {df['日期'].max()}")
            unique_indices = df['指數名稱'].unique()
            print(f"指數數量: {len(unique_indices)}")
            
            # 檢查資料完整性
            print("\n指數資料完整性檢查:")
            for index_name in unique_indices[:5]:  # 只顯示前5個
                index_data = df[df['指數名稱'] == index_name]
                dates_count = len(index_data)
                null_count = index_data['收盤指數'].isnull().sum()
                print(f"{index_name}:")
                print(f"  - 資料筆數: {dates_count}")
                print(f"  - 缺失值數: {null_count}")
                if dates_count > 0:
                    print(f"  - 最新收盤: {index_data['收盤指數'].iloc[-1]:.2f}")
                    
        except Exception as e:
            print(f"驗證產業指數資料時發生錯誤: {str(e)}")
            
    def _check_industry_analysis_files(self):
        analysis_dirs = {
            "return_index": self.industry_analysis_dir / "return_index",
            "price_index": self.industry_analysis_dir / "price_index"
        }
        
        for dir_name, dir_path in analysis_dirs.items():
            print(f"\n檢查 {dir_name} 目錄:")
            if not dir_path.exists():
                print(f"✗ 目錄不存在: {dir_path}")
                continue
                
            json_files = list(dir_path.glob("*.json"))
            print(f"找到 {len(json_files)} 個JSON檔案")
            
            if json_files:
                # 檢查最新的幾個文件
                print("最新文件詳情:")
                sorted_files = sorted(json_files, key=lambda x: x.stat().st_mtime, reverse=True)
                for file in sorted_files[:3]:
                    modified_time = datetime.fromtimestamp(file.stat().st_mtime)
                    file_size = file.stat().st_size / 1024  # Convert to KB
                    print(f"\n{file.name}")
                    print(f"  修改時間: {modified_time}")
                    print(f"  檔案大小: {file_size:.1f}KB")
                    
                    # 嘗試讀取並驗證JSON內容
                    try:
                        with open(file, 'r', encoding='utf-8') as f:
                            data = json.load(f)
                            # 檢查關鍵部分是否存在
                            sections = ['basic_info', 'time_series_analysis', 'risk_analysis']
                            missing_sections = [s for s in sections if s not in data]
                            if missing_sections:
                                print(f"  ⚠️ 缺少區段: {', '.join(missing_sections)}")
                            else:
                                print("  ✓ JSON結構完整")
                    except Exception as e:
                        print(f"  ⚠️ 讀取檔案時發生錯誤: {str(e)}")

# 執行驗證
validator = IndustryDataValidator()
validator.validate_all()

開始驗證產業相關資料...

1. 檢查必要檔案存在性:
✓ companies_final.csv 存在 (大小: 0.08MB)
  欄位: stock_id, stock_name, industry_category, market_type
✓ industry_index.csv 存在 (大小: 2.59MB)
  欄位: 指數名稱, 收盤指數, 漲跌, 漲跌點數, 漲跌百分比, 日期

2. 驗證產業分類對應:
總股票數: 2335
產業類別數: 37

產業分布:
半導體類: 225 檔股票 (9.6%)
電子工業類: 211 檔股票 (9.0%)
生技醫療類: 188 檔股票 (8.1%)
其他類: 176 檔股票 (7.5%)
光電類: 171 檔股票 (7.3%)
電子零組件類: 125 檔股票 (5.4%)
電機機械類: 119 檔股票 (5.1%)
其他電子類: 117 檔股票 (5.0%)
通信網路類: 103 檔股票 (4.4%)
建材營造類: 87 檔股票 (3.7%)
金融保險類: 82 檔股票 (3.5%)
電腦及週邊設備類: 59 檔股票 (2.5%)
化學類: 57 檔股票 (2.4%)
鋼鐵類: 54 檔股票 (2.3%)
紡織纖維類: 54 檔股票 (2.3%)
化學生技醫療類: 52 檔股票 (2.2%)
資訊服務類: 51 檔股票 (2.2%)
觀光類: 44 檔股票 (1.9%)
汽車類: 38 檔股票 (1.6%)
食品類: 38 檔股票 (1.6%)
數位雲端類: 34 檔股票 (1.5%)
航運類: 34 檔股票 (1.5%)
綠能環保類: 33 檔股票 (1.4%)
塑膠類: 28 檔股票 (1.2%)
貿易百貨類: 23 檔股票 (1.0%)
電器電纜類: 21 檔股票 (0.9%)
居家生活類: 20 檔股票 (0.9%)
電子通路類: 19 檔股票 (0.8%)
觀光餐旅類: 18 檔股票 (0.8%)
油電燃氣類: 13 檔股票 (0.6%)
橡膠類: 12 檔股票 (0.5%)
水泥類: 8 檔股票 (0.3%)
造紙類: 8 檔股票 (0.3%)
運動休閒類: 6 檔股票 (0.3%)
玻璃陶瓷類: 5 檔股票 (0.2%)
農業科技: 1 檔股票 (0.0%)
電子商務類: 1 檔股票 (0.0

In [4]:
def debug_industry_files():
    config = TestFeatureConfig()
    
    # 1. 檢查產業分類檔
    companies_path = config.meta_data_path / 'companies_final.csv'
    if companies_path.exists():
        df = pd.read_csv(companies_path, dtype={'stock_id': str})
        print("產業分類檔案:")
        print(f"- 總筆數: {len(df)}")
        print(f"- 產業類別: {df['industry_category'].unique()}")
        
    # 2. 檢查產業指數檔
    index_path = config.meta_data_path / 'industry_index.csv'
    if index_path.exists():
        df = pd.read_csv(index_path)
        print("\n產業指數檔案:")
        print(f"- 總筆數: {len(df)}")
        print(f"- 指數類別: {df['指數名稱'].unique()}")
        
    # 3. 檢查產業分析檔案
    for subdir in ['return_index', 'price_index']:
        path = config.industry_analysis_path / subdir
        if path.exists():
            files = list(path.glob("*.json"))
            print(f"\n{subdir} 目錄:")
            print(f"- 檔案數: {len(files)}")
            if files:
                print("- 檔案範例:")
                for f in files[:3]:
                    print(f"  * {f.name}")

def debug_industry_mapping(stock_id: str):
    config = TestFeatureConfig()
    generator = TestFeatureGenerator(config)
    
    # 1. 取得股票產業
    industries = generator._get_industry_name(stock_id)
    print(f"股票 {stock_id} 的產業分類: {industries}")
    
    if industries:
        for industry in industries:
            # 2. 標準化產業名稱
            std_name = generator._standardize_industry_name(industry)
            print(f"\n產業名稱標準化:")
            print(f"- 原始: {industry}")
            print(f"- 標準化: {std_name}")
            
            # 3. 讀取產業分析
            analysis_data = generator._read_industry_analysis(industry)
            if analysis_data:
                print("\n產業分析數據:")
                print(f"- 資料區段: {list(analysis_data.keys())}")
            else:
                print("\n警告: 無法讀取產業分析數據")

def debug_industry_features(stock_id: str):
    config = TestFeatureConfig()
    generator = TestFeatureGenerator(config)
    
    # 1. 讀取基本資料
    stock_data = pd.read_csv(
        config.test_data_path / 'test_stock_data.csv',
        dtype={'證券代號': str}
    )
    stock_df = stock_data[stock_data['證券代號'] == stock_id]
    
    # 2. 生成產業特徵
    result_df = generator._add_industry_features(stock_df.copy(), stock_id)
    
    # 3. 比較特徵差異
    new_columns = set(result_df.columns) - set(stock_df.columns)
    
    print(f"股票 {stock_id} 的產業特徵:")
    print(f"- 新增特徵數: {len(new_columns)}")
    if new_columns:
        print("- 新增特徵列表:")
        for col in sorted(new_columns):
            not_null = result_df[col].notna().mean() * 100
            print(f"  * {col} (有效數據: {not_null:.1f}%)")

if __name__ == "__main__":
    print("===== 1. 檢查產業相關檔案 =====")
    debug_industry_files()
    
    print("\n===== 2. 檢查股票產業對應 =====")
    test_stocks = ['2330', '2317', '1101']
    for stock_id in test_stocks:
        debug_industry_mapping(stock_id)
        
    print("\n===== 3. 檢查產業特徵生成 =====")
    for stock_id in test_stocks:
        debug_industry_features(stock_id)

===== 1. 檢查產業相關檔案 =====


NameError: name 'TestFeatureConfig' is not defined

In [5]:
import json
from pathlib import Path
from typing import Dict, List, Optional
from collections import defaultdict

class IndustryJsonAnalyzer:
    """產業分析JSON檔案解析器"""
    
    def __init__(self, base_dir: str = "D:/Min/Python/Project/FA_Data"):
        self.base_dir = Path(base_dir)
        self.industry_dirs = [
            self.base_dir / "industry_analysis" / "return_index",
            self.base_dir / "industry_analysis" / "price_index"
        ]

    def analyze_json_structure(self, file_path: Path) -> Dict:
        """分析單一JSON檔案的結構"""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            structure = {
                "filename": file_path.name,
                "top_level_keys": list(data.keys()),
                "sections": {},
                "metrics": defaultdict(list)
            }
            
            # 分析每個主要區段
            for key in data.keys():
                if isinstance(data[key], dict):
                    structure["sections"][key] = self._analyze_section(data[key])
                    # 收集數值型指標
                    self._collect_metrics(data[key], key, structure["metrics"])
            
            return structure
        except Exception as e:
            return {
                "filename": file_path.name,
                "error": f"分析失敗: {str(e)}"
            }

    def _analyze_section(self, section: Dict, prefix: str = "") -> Dict:
        """分析區段的結構"""
        structure = {
            "fields": [],
            "nested_sections": {}
        }
        
        for key, value in section.items():
            full_key = f"{prefix}.{key}" if prefix else key
            
            if isinstance(value, dict):
                structure["nested_sections"][key] = self._analyze_section(value, full_key)
            else:
                if isinstance(value, (int, float)):
                    field_info = f"{key}: {type(value).__name__} = {value}"
                else:
                    field_info = f"{key}: {type(value).__name__}"
                structure["fields"].append(field_info)
        
        return structure

    def _collect_metrics(self, data: Dict, section: str, metrics: Dict, prefix: str = ""):
        """收集數值型指標"""
        for key, value in data.items():
            current_key = f"{prefix}.{key}" if prefix else key
            
            if isinstance(value, dict):
                self._collect_metrics(value, section, metrics, current_key)
            elif isinstance(value, (int, float)):
                metrics[section].append({
                    "name": current_key,
                    "value": value,
                    "type": type(value).__name__
                })

    def print_structured_analysis(self, analysis: Dict):
        """列印結構化分析結果"""
        print(f"\n分析報告 - {analysis['filename']}")
        print("=" * 50)
        
        print("\n1. 主要區段:")
        for key in analysis['top_level_keys']:
            print(f"  - {key}")
        
        print("\n2. 區段詳情:")
        for section, content in analysis['sections'].items():
            print(f"\n{section}:")
            self._print_section_content(content, indent="  ")
        
        print("\n3. 數值指標統計:")
        for section, metrics in analysis['metrics'].items():
            print(f"\n{section} 指標:")
            for metric in metrics:
                print(f"  - {metric['name']}: {metric['value']} ({metric['type']})")

    def _print_section_content(self, content: Dict, indent: str = ""):
        """遞迴列印區段內容"""
        if content['fields']:
            print(f"{indent}欄位:")
            for field in content['fields']:
                print(f"{indent}  - {field}")
        
        if content['nested_sections']:
            print(f"{indent}子區段:")
            for name, subcontent in content['nested_sections'].items():
                print(f"{indent}  {name}:")
                self._print_section_content(subcontent, indent + "    ")

    def analyze_all_files(self):
        """分析所有產業分析檔案"""
        all_analyses = []
        
        for dir_path in self.industry_dirs:
            if not dir_path.exists():
                continue
                
            for file_path in dir_path.glob("*.json"):
                analysis = self.analyze_json_structure(file_path)
                all_analyses.append(analysis)
        
        return all_analyses

    def print_summary(self):
        """列印摘要統計"""
        analyses = self.analyze_all_files()
        
        print("\n產業分析檔案摘要統計")
        print("=" * 50)
        
        # 統計檔案數量
        total_files = len(analyses)
        error_files = sum(1 for a in analyses if 'error' in a)
        
        print(f"\n檔案統計:")
        print(f"- 總檔案數: {total_files}")
        print(f"- 成功解析: {total_files - error_files}")
        print(f"- 解析失敗: {error_files}")
        
        if total_files > 0:
            # 選擇第一個成功的分析來展示詳細結構
            for analysis in analyses:
                if 'error' not in analysis:
                    print("\n示例檔案結構:")
                    self.print_structured_analysis(analysis)
                    break

# 使用範例
if __name__ == "__main__":
    analyzer = IndustryJsonAnalyzer()
    
    # 分析特定檔案
    target_file = Path("D:/Min/Python/Project/FA_Data/industry_analysis/price_index/塑膠化工_20230101_20241230_20241231.json")
    if target_file.exists():
        analysis = analyzer.analyze_json_structure(target_file)
        analyzer.print_structured_analysis(analysis)


分析報告 - 塑膠化工_20230101_20241230_20241231.json

1. 主要區段:
  - basic_info
  - time_series_analysis
  - risk_analysis
  - rotation_analysis
  - technical_analysis
  - investment_suggestions
  - metadata

2. 區段詳情:

basic_info:
  欄位:
    - industry_name: str
    - stocks: list
  子區段:
    period:
      欄位:
        - start: str
        - end: str

time_series_analysis:
  欄位:
    - index_type: str
  子區段:
    trend:
      欄位:
        - slope: float = -0.443237371787683
        - trend_direction: str
        - r2_score: float = 0.8417419124318389
        - trend_strength: float = 0.27334156629205636
      子區段:
        price_range:
          欄位:
            - start_price: float = 814.18
            - end_price: float = 591.71
            - total_return: float = -27.324424574418426
    seasonality:
      子區段:
        monthly:
          欄位:
            - strongest_month: int = 5
            - weakest_month: int = 12
          子區段:
            monthly_pattern:
              欄位:
                - 1: fl

In [None]:
JSON檔案基本資訊
檔名格式: {產業名稱}{起始日期}{結束日期}_{報告生成日期}.json
位置: D:/Min/Python/Project/FA_Data/industry_analysis/{price_index或return_index}/
JSON主要結構
pythonCopy{
    "basic_info": {
        "industry_name": str,     # 產業名稱
        "period": {
            "start": str,         # 起始日期
            "end": str           # 結束日期
        },
        "stocks": List[str]      # 產業內股票列表
    },

    "time_series_analysis": {
        "index_type": str,       # 指數類型
        "trend": {
            "slope": float,       # 趨勢斜率
            "trend_direction": str, # 趨勢方向
            "trend_strength": float, # 趨勢強度
            "r2_score": float,    # R平方值
            "price_range": {
                "start_price": float,
                "end_price": float,
                "total_return": float
            }
        },
        "seasonality": {...},    # 季節性分析
        "lead_lag": {...}        # 領先落後分析
    },

    "risk_analysis": {
        "index_type": str,       # 指數類型
        "ratios": {
            "annual_return": float,       # 年化報酬率
            "annual_volatility": float,   # 年化波動率
            "sharpe_ratio": float        # 夏普比率
        },
        "downside": {
            "downside_volatility": float, # 下檔波動率
            "sortino_ratio": float,      # 索提諾比率
            "loss_frequency": float,     # 虧損頻率
            "avg_loss": float           # 平均虧損
        },
        "tail_risk": {
            "var_95": float,            # 95%風險值
            "var_99": float,            # 99%風險值
            "cvar_95": float,           # 95%條件風險值
            "cvar_99": float,           # 99%條件風險值
            "skewness": float,          # 偏度
            "kurtosis": float           # 峰度
        },
        "drawdown": {
            "max_drawdown": float,       # 最大回撤
            "avg_drawdown": float,       # 平均回撤
            "avg_recovery_time": float,  # 平均恢復時間
            "max_recovery_time": int     # 最長恢復時間
        }
    },

    "rotation_analysis": {
        "strength_ranking": {
            "period_returns": Dict,     # 各產業期間報酬
            "ranking": Dict,            # 強度排名
            "strong_industries": List,   # 強勢產業
            "weak_industries": List     # 弱勢產業
        },
        "momentum_ranking": {
            "scores": Dict,            # 動能分數
            "ranking": Dict            # 動能排名
        },
        "flow_analysis": {
            "indicators": Dict,        # 資金流向指標
            "rankings": Dict           # 排名
        }
    },

    "investment_suggestions": {
        "risk_assessment": str,        # 風險評估
        "timing_suggestions": str,     # 時機建議
        "position_suggestions": str,   # 持倉建議
        "key_points": List[str]       # 關鍵重點
    },

    "metadata": {
        "generation_time": str,        # 生成時間
        "index_type": str,            # 指數類型
        "analysis_period": {
            "start": str,
            "end": str
        }
    }
}
主要數值參考範圍
pythonCopy# risk_analysis 參考值範例
risk_analysis = {
    "ratios": {
        "annual_return": -0.15726943544730662,      # 年化報酬率
        "annual_volatility": 0.14277357638501306,   # 年化波動率
        "sharpe_ratio": -1.2416123482770345         # 夏普比率
    },
    "downside": {
        "downside_volatility": 0.10844458850754797, # 下檔波動率
        "sortino_ratio": -1.6346545077716652,       # 索提諾比率
        "loss_frequency": 0.5197505197505198,       # 虧損頻率
        "avg_loss": -0.006867374795701894          # 平均虧損
    },
    "tail_risk": {
        "var_95": -0.013887794961014015,           # 95%風險值
        "var_99": -0.020996887248591646,           # 99%風險值
        "cvar_95": -0.02158904766390701,           # 95%條件風險值
        "cvar_99": -0.03729531002878461,           # 99%條件風險值
        "skewness": -0.9029288198744725,           # 偏度
        "kurtosis": 7.495963170618174              # 峰度
    },
    "drawdown": {
        "max_drawdown": -0.3319202812538859,       # 最大回撤
        "avg_drawdown": -0.11834489489422215,      # 平均回撤
        "avg_recovery_time": 7.0,                   # 平均恢復時間
        "max_recovery_time": 24                     # 最長恢復時間
    }
}

In [None]:
import pandas as pd
import numpy as np
import logging
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
from pathlib import Path
import psutil
import gc

@dataclass
class ValidationResult:
    """驗證結果資料類別"""
    is_valid: bool
    message: str
    details: Dict = None

class NotebookValidator:
    """Notebook 驗證器"""
    
    def __init__(self, config_path: Path):
        self.config_path = config_path
        self.logger = self._setup_logger()
        self.validation_results = {}
        
    def _setup_logger(self):
        """設置日誌"""
        logger = logging.getLogger('NotebookValidator')
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        return logger
        
    def validate_data_structure(self) -> ValidationResult:
        """驗證資料結構"""
        try:
            # 檢查必要目錄
            required_dirs = [
                'meta_data',
                'daily_price',
                'technical_analysis',
                'features'
            ]
            
            missing_dirs = []
            for dir_name in required_dirs:
                if not (self.config_path / dir_name).exists():
                    missing_dirs.append(dir_name)
            
            if missing_dirs:
                return ValidationResult(
                    is_valid=False,
                    message=f"缺少必要目錄: {', '.join(missing_dirs)}",
                    details={'missing_dirs': missing_dirs}
                )
                
            return ValidationResult(
                is_valid=True,
                message="資料結構驗證通過"
            )
            
        except Exception as e:
            return ValidationResult(
                is_valid=False,
                message=f"驗證資料結構時發生錯誤: {str(e)}"
            )
            
    def validate_feature_generation(self, df: pd.DataFrame) -> ValidationResult:
        """驗證特徵生成"""
        try:
            issues = []
            
            # 1. 先列印所有欄位，確認資料結構
            self.logger.info("現有欄位:")
            for col in df.columns:
                self.logger.info(f"- {col}")
            
            # 2. 檢查基礎技術指標的存在性和有效性
            base_indicators = {
                'RSI': 'RSI指標',
                'MACD': 'MACD指標',
                'MACD_signal': 'MACD信號線',
                'MACD_hist': 'MACD柱狀圖',
                'slowk': 'KD指標K線',
                'slowd': 'KD指標D線'
            }
            
            for ind, name in base_indicators.items():
                if ind in df.columns:
                    valid_ratio = df[ind].notna().mean() * 100
                    self.logger.info(f"{name} 有效數據比例: {valid_ratio:.2f}%")
                else:
                    issues.append(f"缺少{name}")
            
            # 3. 檢查衍生技術指標的計算
            derived_indicators = [
                'KD_差值',
                '均線糾結度',
                'RSI_動能',
                'MACD_動能',
                '技術綜合評分'
            ]
            
            # 檢查是否能計算這些指標
            if 'slowk' in df.columns and 'slowd' in df.columns:
                self.logger.info("可以計算KD_差值")
            
            ma_cols = ['SMA30', 'DEMA30', 'EMA30']
            if all(col in df.columns for col in ma_cols):
                self.logger.info("可以計算均線糾結度")
                
            if 'RSI' in df.columns:
                self.logger.info("可以計算RSI_動能")
                
            if 'MACD_hist' in df.columns:
                self.logger.info("可以計算MACD_動能")
                
            # 4. 檢查數值有效性
            numeric_cols = df.select_dtypes(include=[np.number]).columns
            for col in numeric_cols:
                null_ratio = df[col].isnull().mean()
                if null_ratio > 0:  # 有任何空值
                    self.logger.warning(f"欄位 {col} 包含 {null_ratio:.2%} 的空值")
            
            # 5. 檢查日期連續性
            if '日期' in df.columns:
                df['日期'] = pd.to_datetime(df['日期'])
                date_gaps = df['日期'].sort_values().diff().dt.days.dropna()
                if date_gaps.max() > 5:  # 如果有超過5天的間隔
                    self.logger.warning(f"資料存在最大 {date_gaps.max()} 天的間隔")
            
            return ValidationResult(
                is_valid=len(issues) == 0,
                message="特徵生成驗證完成",
                details={
                    'issues': issues,
                    'available_columns': df.columns.tolist(),
                    'base_indicators_status': {
                        ind: ind in df.columns 
                        for ind in base_indicators.keys()
                    }
                }
            )
            
        except Exception as e:
            return ValidationResult(
                is_valid=False,
                message=f"驗證特徵生成時發生錯誤: {str(e)}"
            )
            
    def validate_memory_usage(self) -> ValidationResult:
        """驗證記憶體使用"""
        try:
            process = psutil.Process()
            memory_info = process.memory_info()
            
            # 轉換為GB
            memory_usage_gb = memory_info.rss / (1024 * 1024 * 1024)
            
            issues = []
            if memory_usage_gb > 8:  # 超過8GB
                issues.append(f"記憶體使用量過高: {memory_usage_gb:.2f}GB")
            
            # 檢查記憶體使用趨勢
            initial_memory = memory_usage_gb
            gc.collect()
            final_memory = process.memory_info().rss / (1024 * 1024 * 1024)
            
            if final_memory > initial_memory:
                issues.append("記憶體回收效果不佳")
                
            return ValidationResult(
                is_valid=len(issues) == 0,
                message="記憶體使用驗證完成",
                details={
                    'issues': issues,
                    'memory_usage_gb': memory_usage_gb,
                    'memory_reduction': initial_memory - final_memory
                }
            )
            
        except Exception as e:
            return ValidationResult(
                is_valid=False,
                message=f"驗證記憶體使用時發生錯誤: {str(e)}"
            )
            
    def validate_industry_data(self) -> ValidationResult:
        """驗證產業資料"""
        try:
            issues = []
            
            # 檢查產業檔案
            industry_file = self.config_path / 'meta_data' / 'industry_index.csv'
            if not industry_file.exists():
                issues.append("缺少產業指數檔案")
            else:
                df = pd.read_csv(industry_file)
                
                # 檢查必要欄位
                required_cols = ['日期', '指數名稱', '收盤指數', '漲跌', '漲跌點數', '漲跌百分比']
                missing_cols = [col for col in required_cols if col not in df.columns]
                if missing_cols:
                    issues.append(f"產業指數檔案缺少欄位: {missing_cols}")
                    
                # 檢查資料完整性
                if df.isnull().any().any():
                    null_cols = df.columns[df.isnull().any()].tolist()
                    issues.append(f"產業指數檔案含有空值: {null_cols}")
                    
            return ValidationResult(
                is_valid=len(issues) == 0,
                message="產業資料驗證完成",
                details={'issues': issues}
            )
            
        except Exception as e:
            return ValidationResult(
                is_valid=False,
                message=f"驗證產業資料時發生錯誤: {str(e)}"
            )
            
    def run_all_validations(self, sample_data: pd.DataFrame) -> Dict[str, ValidationResult]:
        """執行所有驗證"""
        self.validation_results = {
            'data_structure': self.validate_data_structure(),
            'feature_generation': self.validate_feature_generation(sample_data),
            'memory_usage': self.validate_memory_usage(),
            'industry_data': self.validate_industry_data()
        }
        
        return self.validation_results
        
    def generate_validation_report(self) -> str:
        """生成驗證報告"""
        report = "Notebook 驗證報告\n" + "="*50 + "\n\n"
        
        for validation_name, result in self.validation_results.items():
            report += f"{validation_name}:\n"
            report += f"通過狀態: {'通過' if result.is_valid else '未通過'}\n"
            report += f"訊息: {result.message}\n"
            
            if result.details:
                report += "詳細資訊:\n"
                for key, value in result.details.items():
                    report += f"  - {key}: {value}\n"
            
            report += "-"*50 + "\n"
            
        return report

def main():
    """主函數"""
    # 設定路徑
    config_path = Path("D:/Min/Python/Project/FA_Data")
    
    # 創建驗證器
    validator = NotebookValidator(config_path)
    
    # 讀取範例資料
    try:
        sample_data = pd.read_csv(config_path / "test_data" / "test_tech_data.csv")
    except Exception as e:
        print(f"讀取範例資料失敗: {str(e)}")
        return
    
    # 執行驗證
    results = validator.run_all_validations(sample_data)
    
    # 生成報告
    report = validator.generate_validation_report()
    
    # 儲存報告
    report_path = config_path / "validation_report.txt"
    with open(report_path, 'w', encoding='utf-8') as f:
        f.write(report)
    
    print(f"驗證報告已儲存至: {report_path}")
    
    # 回傳結果摘要
    failed_validations = [
        name for name, result in results.items() 
        if not result.is_valid
    ]
    
    if failed_validations:
        print(f"\n需要改進的部分: {failed_validations}")
    else:
        print("\n所有驗證都已通過")

if __name__ == "__main__":
    main()