In [21]:
from finlab import data, backtest
from finlab.dataframe import FinlabDataFrame
import numpy as np
import pandas as pd
import talib
import finlab
import os
from datetime import datetime

# 1. 初始設置


def initialize_finlab():
    """初始化 finlab 設置"""
    finlab.login(
        'f/FsMevH43d0EtHf9Tn9YVNymfwFWQPhCjgVxb1WUiWn2bmxmB771cmwwga164D/#vip_m')
    data.set_storage(data.FileStorage(path="E:\\pickle"))
    print("Finlab 初始化完成")

# 2. 數據獲取


def fetch_all_data():
    """獲取所有需要的原始數據"""
    print("開始獲取數據...")

    # 1. 基本價格和交易數據
    close = data.get('price:收盤價')
    high = data.get("price:最高價")
    low = data.get("price:最低價")
    open_price = data.get("price:開盤價")
    volume = data.get("price:成交股數")
    amt = data.get('price:成交金額')
    cap = data.get('etl:market_value')  # 移到這裡，確保在使用前定義
    

    # 2. 融資和大盤數據
    margin_transaction_ratio = data.get('margin_transactions:融資使用率').fillna(0)
    benchmark = data.get('benchmark_return:發行量加權股價報酬指數').squeeze()

    # 3. 計算均線
    sma5 = close.rolling(5).mean()
    sma20 = close.rolling(20).mean()
    sma60 = close.rolling(60).mean()

    # 4. 營收相關
    rev = data.get('monthly_revenue:當月營收')
    rev_ma = rev.rolling(2).mean()  # 營收移動平均
    rev_recent_3 = rev.rolling(3).sum()  # 近3個月營收總和
    當季營收 = rev.rolling(4).sum()

    # 5. 計算價格動量和其他技術指標
    price_momentum_5 = (close - sma5)/sma5
    price_momentum_20 = (close - sma20)/sma20
    price_momentum_60 = (close - sma60)/sma60

    # 6. 計算價格波動率
    close_pct_change = close.pct_change()
    price_volatility = close_pct_change.rolling(20).std()

    # 7. 計算交易活絡度
    trading_activity = volume / cap  # 現在 cap 已經定義了

    # 8. 計算MA反轉
    ma_reversal = sma5/sma20

    # 9. 計算股價營收敏感度
    price_rev_sensitivity = (close - close.shift(20)) / (rev - rev.shift(1))

    # 10. 營收相關
    yoy = data.get('monthly_revenue:去年同月增減(%)')
    cyoy = data.get('monthly_revenue:前期比較增減(%)')

    # 11. 財務指標
    BP_ratio = data.get("price_earning_ratio:股價淨值比")
    pe = data.get('price_earning_ratio:本益比')
    營業利益成長率 = data.get('fundamental_features:營業利益成長率')
    營業利益率 = data.get("fundamental_features:營業利益率")

    # 12. 研發與營收
    研究發展費 = data.get("financial_statement:研究發展費")
    營業收入淨額 = data.get("financial_statement:營業收入淨額")

    # 13. 計算衍生指標
    peg = pe/營業利益成長率
    市值營收比 = cap / 當季營收
    research_ratio = 研究發展費 / 營業收入淨額 * 100

    # 14. 乖離率計算
    bias_sma_250 = close / close.rolling(250).mean() - 1
    bias_sma_60 = close / close.rolling(60).mean() - 1
    bias_sma_5 = close / close.rolling(5).mean() - 1

    # 15. 其他財務指標
    稅後淨利 = data.get('fundamental_features:經常稅後淨利')
    權益總計 = data.get('financial_statement:股東權益總額')
    股東權益報酬率 = 稅後淨利 / 權益總計
    營業現金流 = data.get('financial_statement:營業活動之淨現金流入_流出')
    投資現金流 = data.get('financial_statement:投資活動之淨現金流入_流出')
    融資現金流 = data.get('financial_statement:籌資活動之淨現金流入_流出')

    # 16. 峰度計算
    kurtosis_250 = close.pct_change().rolling(250).kurt()
    kurtosis_120 = close.pct_change().rolling(120).kurt()

    # 17. 均線斜率計算
    slopesma5 = close.rolling(5).mean().pipe(lambda df: df / df.shift(5) - 1)
    slopesma20 = close.rolling(20).mean().pipe(
        lambda df: df / df.shift(20) - 1)
    slopesma60 = close.rolling(60).mean().pipe(
        lambda df: df / df.shift(60) - 1)

    # 18. 投信買賣超數據
    fund_net_buy = data.get('institutional_investors_trading_summary:投信買賣超股數')
    fund_ratio = fund_net_buy / volume

    # 19. 更多營收數據
    上月營收 = data.get('monthly_revenue:上月營收')
    去年當月營收 = data.get('monthly_revenue:去年當月營收')
    上月比較增減 = data.get('monthly_revenue:上月比較增減(%)')
    當月累計營收 = data.get('monthly_revenue:當月累計營收')
    去年累計營收 = data.get('monthly_revenue:去年累計營收')

    # 20. 獲利成長率
    稅前淨利成長率 = data.get('fundamental_features:稅前淨利成長率')
    稅後淨利成長率 = data.get('fundamental_features:稅後淨利成長率')

    print("數據獲取完成")
    return locals()


# 3. 數據清理函數


def clean_data(df):
    """清理數據，處理不同頻率的數據填充"""
    print("開始清理數據...")

    try:
        # 定義不同頻率的數據列
        daily_cols = ['開盤價', '最高價', '最低價', '收盤價', 'SMA5', 'SMA20', 'SMA60',
                      '成交股數', '市值', '乖離率_SMA250', '乖離率_SMA60', '乖離率_SMA5',
                      'SMA5斜率', 'SMA20斜率', 'SMA60斜率', '投信買賣超', '投信買賣超佔比',
                      '成交金額', '融資使用率', '大盤指數', '價格動量5', '價格動量20',
                      '價格動量60', '價格波動率', '交易活絡度', 'MA反轉']

        monthly_cols = ['當月營收', '同月增減(%)', '前期增減(%)', '當季營收', '市值營收比',
                        '上月營收', '去年當月營收', '上月比較增減', '當月累計營收', '去年累計營收',
                        '營收MA2', '近3月營收', '股價營收敏感度']

        quarterly_cols = ['營業利益成長率', '營業利益率', '研究發展費', '營業收入淨額', '研發費用比率',
                          '稅後淨利', '權益總計', '股東權益報酬率', '營業現金流', '投資現金流', '融資現金流',
                          '稅前淨利成長率', '稅後淨利成長率']

        # 檢查數據框中實際存在的列
        existing_daily_cols = [col for col in daily_cols if col in df.columns]
        existing_monthly_cols = [
            col for col in monthly_cols if col in df.columns]
        existing_quarterly_cols = [
            col for col in quarterly_cols if col in df.columns]

        # 打印實際處理的列名（用於調試）
        print("處理的日頻數據列:", existing_daily_cols)
        print("處理的月頻數據列:", existing_monthly_cols)
        print("處理的季頻數據列:", existing_quarterly_cols)

        # 處理無限值
        df = df.replace([np.inf, -np.inf], np.nan)

        # 根據不同頻率填充數據
        if existing_daily_cols:
            df[existing_daily_cols] = df[existing_daily_cols].ffill().bfill()

        if existing_monthly_cols:
            df[existing_monthly_cols] = df[existing_monthly_cols].ffill(
                limit=30)

        if existing_quarterly_cols:
            df[existing_quarterly_cols] = df[existing_quarterly_cols].ffill(
                limit=90)

        # 基本數據檢查
        # 確保價格和成交量為正
        if '收盤價' in df.columns:
            df.loc[df['收盤價'] <= 0, '收盤價'] = np.nan

        if '成交股數' in df.columns:
            df.loc[df['成交股數'] < 0, '成交股數'] = 0

        # 檢查清理後的數據狀況
        null_counts = df.isnull().sum()
        if null_counts.any():
            print("\n清理後仍存在的空值數量:")
            print(null_counts[null_counts > 0])

        print("數據清理完成")
        
     # 添加數據品質檢查
        if df['收盤價'].max() > 10000 or df['收盤價'].min() < 0:
            print("警告：發現異常價格數據")

        if df['成交股數'].min() < 0:
            print("警告：發現負的成交量")

        # 添加資料完整性檢查
        daily_missing = df[['收盤價', '成交股數']].isnull().sum()
        if daily_missing.any():
            print("警告：日頻數據存在缺失")
            print(daily_missing)

        # 計算數據覆蓋率
        coverage = (df.count() / len(df) * 100).round(2)
        print("\n數據覆蓋率（%）:")
        print(coverage)

        def handle_outliers(df, column, n_std=3):
            """使用標準差法處理異常值"""
            mean = df[column].mean()
            std = df[column].std()
            df.loc[df[column] > mean + n_std*std, column] = mean + n_std*std
            df.loc[df[column] < mean - n_std*std, column] = mean - n_std*std
            return df

        # 對關鍵指標處理異常值
        numeric_cols = ['成交股數', '成交金額', '價格波動率', '交易活絡度']
        for col in numeric_cols:
            if col in df.columns:
                df = handle_outliers(df, col)

        # 新增：計算月度數據的變化率
        if '當月營收' in df.columns and '上月營收' in df.columns:
            df['營收月增率'] = (df['當月營收'] - df['上月營收']) / df['上月營收'] * 100

        # 新增：技術指標的交叉信號
        if all(col in df.columns for col in ['SMA5', 'SMA20']):
            df['黃金交叉'] = (df['SMA5'] > df['SMA20']) & (df['SMA5'].shift(1) <= df['SMA20'].shift(1))
            df['死亡交叉'] = (df['SMA5'] < df['SMA20']) & (df['SMA5'].shift(1) >= df['SMA20'].shift(1))

        # 新增：資金面指標
        if '投信買賣超' in df.columns and '成交金額' in df.columns:
            df['投信影響力'] = df['投信買賣超'] * df['收盤價'] / df['成交金額']

        return df

    except Exception as e:
        print(f"清理數據時發生錯誤: {str(e)}")
        print("數據框的列名:", df.columns.tolist())
        raise


def analyze_data(df):
    """新增的數據分析函數"""
    try:
        # 1. 技術面分析
        tech_analysis = {
            '均線系統': {
                'SMA5': df['SMA5'].iloc[-1],
                'SMA20': df['SMA20'].iloc[-1],
                'SMA60': df['SMA60'].iloc[-1],
                '乖離率': df['乖離率_SMA20'].iloc[-1]
            },
            '成交量分析': {
                '20日均量': df['成交股數'].rolling(20).mean().iloc[-1],
                '量增率': df['成交股數'].pct_change().iloc[-1],
                '成交值': df['成交金額'].iloc[-1]
            },
            '技術指標': {
                'RSI': None,  # 需要另外計算
                '投信動向': df['投信買賣超'].iloc[-1],
                '融資使用率': df['融資使用率'].iloc[-1]
            }
        }

        # 2. 基本面分析
        if '當月營收' in df.columns:
            fundamental_analysis = {
                '營收表現': {
                    '當月營收': df['當月營收'].iloc[-1],
                    '年增率': df['上月比較增減'].iloc[-1],
                    '累計營收': df['當月累計營收'].iloc[-1]
                }
            }

        # 3. 資金面分析
        if '投信買賣超' in df.columns:
            fund_analysis = {
                '投信': df['投信買賣超'].rolling(20).sum().iloc[-1],
                '投信佔比': df['投信買賣超佔比'].rolling(20).mean().iloc[-1]
            }

        return {
            '技術面': tech_analysis,
            '基本面': fundamental_analysis if '當月營收' in df.columns else None,
            '資金面': fund_analysis if '投信買賣超' in df.columns else None
        }

    except Exception as e:
        print(f"分析數據時發生錯誤: {str(e)}")
        raise

# 4. 創建綜合數據框架


def create_comprehensive_df(data_dict, symbol='6510'):
    """創建綜合數據框架"""
    print(f"開始處理 {symbol} 的數據...")

    # 確保股票代碼是字符串格式
    symbol = str(symbol)

    try:
        # 創建數據字典，加入錯誤處理
        df_dict = {}

        # 基本數據
        df_dict.update({
            '開盤價': data_dict['open_price'][symbol],
            '最高價': data_dict['high'][symbol],
            '最低價': data_dict['low'][symbol],
            '收盤價': data_dict['close'][symbol],
            '成交股數': data_dict['volume'][symbol],
            '成交金額': data_dict['amt'][symbol],
            '市值': data_dict['cap'][symbol],
            '大盤指數': data_dict['benchmark']  # 加入大盤指數

        })

        # 均線數據
        df_dict.update({
            'SMA5': data_dict['sma5'][symbol],
            'SMA20': data_dict['sma20'][symbol],
            'SMA60': data_dict['sma60'][symbol]
        })

        # 乖離率
        df_dict.update({
            '乖離率_SMA250': data_dict['bias_sma_250'][symbol],
            '乖離率_SMA60': data_dict['bias_sma_60'][symbol],
            '乖離率_SMA5': data_dict['bias_sma_5'][symbol]
        })

        # 動量指標
        df_dict.update({
            '價格動量5': data_dict['price_momentum_5'][symbol],
            '價格動量20': data_dict['price_momentum_20'][symbol],
            '價格動量60': data_dict['price_momentum_60'][symbol],
            '價格波動率': data_dict['price_volatility'][symbol]
        })

        # 交易相關
        df_dict.update({
            '交易活絡度': data_dict['trading_activity'][symbol],
            'MA反轉': data_dict['ma_reversal'][symbol],
            '融資使用率': data_dict['margin_transaction_ratio'][symbol]
        })

        # 營收相關
        df_dict.update({
            '當月營收': data_dict['rev'][symbol],
            '營收MA2': data_dict['rev_ma'][symbol],
            '近3月營收': data_dict['rev_recent_3'][symbol],
            '股價營收敏感度': data_dict['price_rev_sensitivity'][symbol],
            '上月營收': data_dict['上月營收'][symbol],
            '去年當月營收': data_dict['去年當月營收'][symbol],
            '上月比較增減': data_dict['上月比較增減'][symbol],
            '當月累計營收': data_dict['當月累計營收'][symbol],
            '去年累計營收': data_dict['去年累計營收'][symbol]
        })

        # 均線斜率
        df_dict.update({
            'SMA5斜率': data_dict['slopesma5'][symbol],
            'SMA20斜率': data_dict['slopesma20'][symbol],
            'SMA60斜率': data_dict['slopesma60'][symbol]
        })

        # 投信數據
        df_dict.update({
            '投信買賣超': data_dict['fund_net_buy'][symbol],
            '投信買賣超佔比': data_dict['fund_ratio'][symbol]
        })

        # 轉換為DataFrame並處理索引
        df = pd.DataFrame(df_dict)

        # 確保索引是日期格式
        if not isinstance(df.index, pd.DatetimeIndex):
            df.index = pd.to_datetime(df.index)

        # 排序並刪除重複的索引
        df = df.sort_index()
        df = df[~df.index.duplicated(keep='first')]

        print(f"{symbol} 數據處理完成")
        return df

    except KeyError as e:
        print(f"錯誤：無法找到股票代碼 {symbol} 的某些數據: {str(e)}")
        raise
    except Exception as e:
        print(f"處理數據時發生錯誤: {str(e)}")
        raise



# 5. 數據分析和儲存


# 修改 analyze_and_save_data 函數，使用更安全的儲存路徑
def analyze_data(df):
    """新增的數據分析函數，包含錯誤處理和欄位驗證"""
    try:
        # 1. 技術面分析
        tech_analysis = {}

        # 均線系統
        ma_cols = {
            'SMA5': 'SMA5',
            'SMA20': 'SMA20',
            'SMA60': 'SMA60'
        }

        ma_system = {}
        for name, col in ma_cols.items():
            if col in df.columns:
                ma_system[name] = df[col].iloc[-1]

        # 檢查乖離率欄位
        bias_cols = ['乖離率_SMA250', '乖離率_SMA60', '乖離率_SMA5']
        for col in bias_cols:
            if col in df.columns:
                ma_system['乖離率'] = df[col].iloc[-1]
                break

        if ma_system:
            tech_analysis['均線系統'] = ma_system

        # 成交量分析
        volume_analysis = {}
        if '成交股數' in df.columns:
            volume_analysis['20日均量'] = df['成交股數'].rolling(20).mean().iloc[-1]
            volume_analysis['量增率'] = df['成交股數'].pct_change().iloc[-1]

        if '成交金額' in df.columns:
            volume_analysis['成交值'] = df['成交金額'].iloc[-1]

        if volume_analysis:
            tech_analysis['成交量分析'] = volume_analysis

        # 技術指標
        tech_indicators = {}

        # 計算RSI
        if '收盤價' in df.columns:
            price_change = df['收盤價'].diff()
            gain = (price_change.where(price_change > 0, 0)
                    ).rolling(window=14).mean()
            loss = (-price_change.where(price_change < 0, 0)
                    ).rolling(window=14).mean()
            rs = gain / loss
            rsi = 100 - (100 / (1 + rs))
            tech_indicators['RSI'] = rsi.iloc[-1]

        if '投信買賣超' in df.columns:
            tech_indicators['投信動向'] = df['投信買賣超'].iloc[-1]

        if '融資使用率' in df.columns:
            tech_indicators['融資使用率'] = df['融資使用率'].iloc[-1]

        if tech_indicators:
            tech_analysis['技術指標'] = tech_indicators

        # 2. 基本面分析
        fundamental_analysis = None
        if '當月營收' in df.columns:
            fundamental_analysis = {
                '營收表現': {
                    '當月營收': df['當月營收'].iloc[-1]
                }
            }

            if '上月比較增減' in df.columns:
                fundamental_analysis['營收表現']['年增率'] = df['上月比較增減'].iloc[-1]

            if '當月累計營收' in df.columns:
                fundamental_analysis['營收表現']['累計營收'] = df['當月累計營收'].iloc[-1]

        # 3. 資金面分析
        fund_analysis = None
        if '投信買賣超' in df.columns:
            fund_analysis = {
                '投信': df['投信買賣超'].rolling(20).sum().iloc[-1]
            }

            if '投信買賣超佔比' in df.columns:
                fund_analysis['投信佔比'] = df['投信買賣超佔比'].rolling(
                    20).mean().iloc[-1]

        # 4. 組合最終結果
        result = {
            '技術面': tech_analysis if tech_analysis else None,
            '基本面': fundamental_analysis,
            '資金面': fund_analysis
        }

        # 5. 打印可用的欄位（用於調試）
        print("\n可用的數據欄位：")
        print(df.columns.tolist())

        return result

    except Exception as e:
        print(f"分析數據時發生錯誤: {str(e)}")
        print("可用的數據欄位：", df.columns.tolist())
        raise


def analyze_and_save_data(df, symbol, base_path='data'):
    """分析數據並儲存結果，使用 UTF-8-SIG 編碼"""
    try:
        # 創建儲存目錄
        save_dir = os.path.join(base_path, symbol)
        os.makedirs(save_dir, exist_ok=True)

        # 儲存原始數據，使用 UTF-8-SIG 編碼
        df.to_csv(os.path.join(save_dir, f'{symbol}_data.csv'),
                  encoding='utf-8-sig',
                  index=True)

        # 生成分析結果
        analysis_results = analyze_data(df)

        # 將分析結果轉換為 DataFrame 並儲存，使用 UTF-8-SIG 編碼
        analysis_df = pd.DataFrame.from_dict(analysis_results, orient='index')
        analysis_df.to_csv(os.path.join(save_dir, f'{symbol}_analysis.csv'),
                           encoding='utf-8-sig',
                           index=True)

        # 儲存詳細分析報告
        with open(os.path.join(save_dir, f'{symbol}_report.txt'), 'w',
                  encoding='utf-8-sig') as f:
            f.write(f"股票代碼 {symbol} 分析報告\n")
            f.write("=" * 50 + "\n")

            for category, details in analysis_results.items():
                if details:
                    f.write(f"\n{category}:\n")
                    for key, value in details.items():
                        if isinstance(value, dict):
                            f.write(f"  {key}:\n")
                            for subkey, subvalue in value.items():
                                if subvalue is not None:
                                    f.write(f"    {subkey}: {subvalue}\n")
                        else:
                            if value is not None:
                                f.write(f"  {key}: {value}\n")

        print(f"數據和分析結果已儲存到 {save_dir}")
        return analysis_results

    except Exception as e:
        print(f"儲存數據時發生錯誤: {str(e)}")
        raise




# 6. 主程序


def main(symbol='6510'):
    """主程序"""
    try:
        print(f"開始處理股票代碼：{symbol}")

        # 1. 初始化 finlab
        initialize_finlab()

        # 2. 獲取所有原始數據
        data_dict = fetch_all_data()
        if not data_dict:
            raise ValueError("無法獲取數據")

        # 3. 創建綜合數據框架
        df = create_comprehensive_df(data_dict, symbol)
        if df.empty:
            raise ValueError(f"無法為股票 {symbol} 創建數據框架")

        # 4. 清理數據
        df = clean_data(df)
        if df.empty:
            raise ValueError("數據清理後為空")

        # 5. 分析數據
        analysis_results = analyze_data(df)
        if not analysis_results:
            print("警告：分析結果為空")

        # 6. 儲存和分析結果
        analyze_and_save_data(df, symbol)

        # 7. 打印分析結果
        print("\n=== 分析結果摘要 ===")
        for category, details in analysis_results.items():
            if details:
                print(f"\n{category}:")
                for key, value in details.items():
                    if isinstance(value, dict):
                        print(f"  {key}:")
                        for subkey, subvalue in value.items():
                            if subvalue is not None:
                                print(f"    {subkey}: {subvalue:.2f}" if isinstance(subvalue, (float, np.float64))
                                      else f"    {subkey}: {subvalue}")
                    else:
                        if value is not None:
                            print(f"  {key}: {value:.2f}" if isinstance(value, (float, np.float64))
                                  else f"  {key}: {value}")

        return df, analysis_results

    except Exception as e:
        print(f"執行過程中發生錯誤: {str(e)}")
        import traceback
        traceback.print_exc()
        return None, None



def fetch_and_clean_data(symbol):
    """獲取和清理數據的輔助函數"""
    try:
        # 1. 獲取原始數據
        data_dict = fetch_all_data()

        # 2. 創建綜合數據框架
        df = create_comprehensive_df(data_dict, symbol)

        # 3. 清理數據
        df = clean_data(df)

        return df

    except Exception as e:
        print(f"獲取和清理數據時發生錯誤: {str(e)}")
        raise
    

if __name__ == "__main__":
    try:
        # 使用預設股票代碼
        print("\n使用預設股票代碼測試:")
        df1, analysis1 = main()

        # 使用指定股票代碼
        # print("\n使用以下股票代碼測試:")
        # df2, analysis2 = main('2454')

    except Exception as e:
        print(f"程式執行失敗: {str(e)}")


使用預設股票代碼測試:
開始處理股票代碼：6510
輸入成功!
Finlab 初始化完成
開始獲取數據...


  close_pct_change = close.pct_change()
  kurtosis_250 = close.pct_change().rolling(250).kurt()
  kurtosis_120 = close.pct_change().rolling(120).kurt()


數據獲取完成
開始處理 6510 的數據...
6510 數據處理完成
開始清理數據...
處理的日頻數據列: ['開盤價', '最高價', '最低價', '收盤價', 'SMA5', 'SMA20', 'SMA60', '成交股數', '市值', '乖離率_SMA250', '乖離率_SMA60', '乖離率_SMA5', 'SMA5斜率', 'SMA20斜率', 'SMA60斜率', '投信買賣超', '投信買賣超佔比', '成交金額', '融資使用率', '大盤指數', '價格動量5', '價格動量20', '價格動量60', '價格波動率', '交易活絡度', 'MA反轉']
處理的月頻數據列: ['當月營收', '上月營收', '去年當月營收', '上月比較增減', '當月累計營收', '去年累計營收', '營收MA2', '近3月營收', '股價營收敏感度']
處理的季頻數據列: []

清理後仍存在的空值數量:
當月營收       3032
營收MA2      3048
近3月營收      3072
股價營收敏感度    3346
上月營收       3032
去年當月營收     3032
上月比較增減     3048
當月累計營收     3032
去年累計營收     3032
dtype: int64
數據清理完成

數據覆蓋率（%）:
開盤價           100.00
最高價           100.00
最低價           100.00
收盤價           100.00
成交股數          100.00
成交金額          100.00
市值            100.00
大盤指數          100.00
SMA5          100.00
SMA20         100.00
SMA60         100.00
乖離率_SMA250    100.00
乖離率_SMA60     100.00
乖離率_SMA5      100.00
價格動量5         100.00
價格動量20        100.00
價格動量60        100.00
價格波動率         100.00
交易活絡度         100.00
MA反轉   