In [10]:
import pandas as pd
import numpy as np
import time
from pathlib import Path
import requests
from bs4 import BeautifulSoup
from lxml import etree
import re
import random

In [11]:
# https://doc.twse.com.tw/server-java/t57sb01?step=1&colorchg=1&co_id=4552&year=105&mtype=A
# !!!!!!!!!!!!!!!!!!!證券等行業沒有毛利、營收也不叫營收

select_list = ['營業收入', '前一期營業收入', '本期淨利', '前一期本期淨利', '每股盈餘', '前一期每股盈餘',
                             '歸屬於母公司業主之權益合計', '前一期歸屬於母公司業主之權益合計', '已發行股份總數', '前一期已發行股份總數',
                             '負債總計', '前一期負債總計', '資產總計', '前一期資產總計']
cal_list = [
    "營收成長率",              # (本期營業收入 - 前一期營業收入) / 前一期營業收入
    "本期淨利成長率",          # (本期淨利 - 前一期本期淨利) / 前一期本期淨利
    "每股盈餘成長率",          # (本期每股盈餘 - 前一期每股盈餘) / 前一期每股盈餘
    "ROE",                    # 本期淨利 / 歸屬於母公司業主之權益合計
    "前一期ROE",               # 前一期本期淨利 / 前一期股東權益
    "ROE成長率",               # (ROE - 前一期ROE) / 前一期ROE
    "ROA",                     # 本期淨利 / 資產總計
    "前一期ROA",               # 前一期本期淨利 / 前一期資產總計
    "ROA成長率",
    "每股淨值",                # 股東權益 / 已發行股份總數(需原數值除以10)
    "前一期每股淨值",          # 前一期股東權益 / 前一期普通股股本
    "每股淨值成長率",
    "負債比",                  # 負債總計 / 資產總計
    "前一期負債比",            # 前一期負債總計 / 前一期資產總計
    "負債比成長率",
    # "股價淨值比"               # 股價 / 每股淨值
]
new_col = cal_list + select_list

In [12]:
# https://mopsov.twse.com.tw/server-java/t164sb01?step=1&CO_ID=6725&SYEAR=2024&SSEASON=2&REPORT_ID=C#StatementOfComprehensiveIncome 綜合損益表
# https://mopsov.twse.com.tw/server-java/t164sb01?step=1&CO_ID=6725&SYEAR=2024&SSEASON=2&REPORT_ID=C#BalanceSheet 資產負債表
# https://mopsov.twse.com.tw/server-java/t164sb01?step=1&CO_ID=6725&SYEAR=2024&SSEASON=2&REPORT_ID=C#StatementsOfCashFlows 現金流量表
# https://doc.twse.com.tw/server-java/t57sb01?step=1&colorchg=1&co_id=4552&year=105&mtype=A　財報上傳日期
# https://doc.twse.com.tw/server-java/t57sb01?step=1&colorchg=1&co_id=8488&year=104&seamon=&mtype=A&
# https://openapi.twse.com.tw/v1/opendata/t187ap03_L　     上市基本資料
# https://www.tpex.org.tw/openapi/v1/mopsfin_t187ap03_O    上櫃基本資料
# https://www.tpex.org.tw/openapi/v1/mopsfin_t187ap03_R    興櫃基本資料

# 轉為浮點數
def to_number(text):
    if text is None:
        return None
    if isinstance(text, (int, float)):
        return text
    s = str(text).strip()
    if s == "":
        return None
    # 處理括號負數 (1,234)
    negative = False
    if s.startswith("(") and s.endswith(")"):
        negative = True
        s = s[1:-1]
    # 移除非數字、非小數點、非負號
    s = re.sub(r"[^\d.\\-]", "", s)
    if s == "":
        return None
    num = float(s)
    return -num if negative else num


# list轉為日期格式
def to_datetime(text_list):
    # 保持回傳型態一致，若為空則回傳空 list
    if not text_list:
        return []

    results = []
    for t in text_list:
        text = str(t).strip()

        # 處理「108/08/14 12:00:22」這種格式
        try:
            # 拆分日期與時間
            date_part = text.split(' ')[0]
            parts = date_part.split('/')

            if len(parts) == 3:
                # 民國轉西元
                year = int(parts[0]) + 1911
                month = parts[1]
                day = parts[2]
                clean_text = f"{year}-{month}-{day}"

                # 轉成 Timestamp
                dt = pd.to_datetime(clean_text)
                results.append(dt)
        except Exception as e:
            pass

    return results


# 搜尋投標前最接近的財報上傳日期
def search_year_season(code, date:pd.Timestamp):
    print("update time search_: ", code, ", date : ", date)
    try:
        if code is None or date is None:
            print("code or date is None")
            return None
        re = []
        for i in range(2):
            # print(date.year-1911-i)
            url = "https://doc.twse.com.tw/server-java/t57sb01"
            params = {
                "step": 1,
                "colorchg": 1,
                "co_id": code,
                "year": date.year-1911-i,
                "mtype": "A",
            }
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
            }
            max_retries = 5  # 最多重試 5 次
            retry_count = 0
            # print(req.text)
            while retry_count < max_retries:
                req = requests.get(url, params=params, headers=headers)
                req.encoding = 'big5'
                if "查詢過量" in req.text or req.status_code != 200:
                    retry_count += 1
                    wait = 15
                    print(f"!!! 被阻擋了！第 {retry_count} 次重試，等待 {wait} 秒...")
                    time.sleep(wait)
                    continue # 跳回 while 開頭重新請求
                else:
                    break
            tree = etree.HTML(req.text)

            xpath1 = ("//tr[td[text()='IFRSs合併財報']]/td[10]/text()")
            xpath2 = ("//tr[td[text()='IFRSs合併財報']]/td[2]/text()")
            update_date = tree.xpath(xpath1)
            season = tree.xpath(xpath2)

            f_type = "合併"

            if not update_date:
                xpath_individual_date = "//tr[td[contains(text(), 'IFRSs個別財報')]]/td[10]/text()"
                xpath_individual_season = "//tr[td[contains(text(), 'IFRSs個別財報')]]/td[2]/text()"
                update_date = tree.xpath(xpath_individual_date)
                season = tree.xpath(xpath_individual_season)
                f_type = "個別"
            if not update_date:
                print("當期無財報，準備搜尋前一季...")
                continue

            date_l = to_datetime(update_date)
            season_conver = [s.replace(' 年 ', '/s')
                                    .replace('第一季', '1')
                                    .replace('第二季', '2')
                                    .replace('第三季', '3')
                                    .replace('第四季', '4').strip()
                                    for s in season]

            combi = [(d, s, f_type) for d, s in zip(date_l, season_conver)]

            re.extend(combi)

        result = max((p for p in re if p[0] < date), key=lambda x: x[0], default=None)

        return result   # (Timestamp('2025-11-13 00:00:00'), '114/s3')

    except Exception as e:
        print(f"財報上傳查詢 {code} 連線失敗：{e}")
        return None


In [13]:
fallback_dict = {
    "營業收入": ["營業收入合計", "收益合計", "淨收益", "收益"],
    "本期淨利": ["母公司業主（淨利／損）", "本期稅後淨利（損）歸屬於母公司業主", "母公司業主", "母公司業主（淨利／損）", "本期淨利（淨損）", "本期淨利"],
    "每股盈餘": ["基本每股盈餘合計", "基本每股盈餘"],
    "歸屬於母公司業主之權益合計": ["歸屬於母公司業主之權益合計", "權益總計", "權益總額"],
    "已發行股份總數": ["普通股股本"],
    "負債總計": ["負債總計"],
    "資產總計": ["資產總計"]
}

In [14]:

def make_xpath(label, col_index):
    # 定義要從 HTML 中剔除的雜訊：包含半形空格、換行、定位點、回車、以及全形空格(　)和 NBSP(\u00a0)
    # //tr[td[normalize-space(translate(., '　', ' '))='權益總計']]/td[2]//text()[normalize-space()]
    return f"//tr[td[normalize-space(translate(., '　', ' '))='{label}']]/td[{col_index}]//text()"


def get_report(tree, fallback_dict):
    report = {}

    # 遍歷我們定義的對照表
    for key, possible_labels in fallback_dict.items():
        found_data = False
        for label in possible_labels:
            # 1. 嘗試抓取當期值 (td[2])
            val_curr_raw = tree.xpath(make_xpath(label, 2))

            if val_curr_raw:
                a = 100 if key == "已發行股份總數" else 1
                # 存入當期資料
                report[key] = round(to_number(val_curr_raw[0]) * a, 3)
                # 2. 自動抓取前一期資料 (td[3])
                val_prev_raw = tree.xpath(make_xpath(label, 3))
                report[f"前一期{key}"] = round(to_number(val_prev_raw[0]) * a, 3) if val_prev_raw else None

                found_data = True
                break # 找到其中一個標籤有值，就跳到下一個科目

        if not found_data:
            report[key] = report[f"前一期{key}"] = None

    return report


In [15]:
def calculate_ratios(df, dec):
    def growth(cur_col, pri_col):
        return np.where(pri_col != 0, (cur_col - pri_col) / pri_col.abs(), 0)

    # --- 獲利效率 ---
    df["營收成長率"] = growth(df["營業收入"], df["前一期營業收入"]).round(dec)
    df["本期淨利成長率"] = growth(df["本期淨利"], df["前一期本期淨利"]).round(dec)
    df["每股盈餘成長率"] = growth(df["每股盈餘"], df["前一期每股盈餘"]).round(dec)
    df['ROE'] = (df['本期淨利'] / df['歸屬於母公司業主之權益合計']).round(dec)
    df['前一期ROE'] = (df['前一期本期淨利'] / df['前一期歸屬於母公司業主之權益合計']).round(dec)
    df['ROE成長率'] = growth(df['ROE'], df['前一期ROE']).round(dec)
    df['ROA'] = (df['本期淨利'] / df['資產總計']).round(dec)
    df['前一期ROA'] = (df['前一期本期淨利'] / df['前一期資產總計']).round(dec)
    df["ROA成長率"] = growth(df['ROA'], df['前一期ROA']).round(dec)

    # --- 價值指標 (處理你問的每股淨值) ---
    # 分子(千元*1000) / 分母(股數)
    df['每股淨值'] = ((df['歸屬於母公司業主之權益合計'] * 1000) / df['已發行股份總數']).round(dec)
    df['前一期每股淨值'] = ((df['前一期歸屬於母公司業主之權益合計'] * 1000) / df['前一期已發行股份總數']).round(dec)
    df['每股淨值成長率'] = growth(df['每股淨值'], df['前一期每股淨值']).round(dec)
    df["負債比"] = (df['負債總計'] / df['資產總計']).round(dec)
    df["前一期負債比"] = (df['前一期負債總計'] / df['前一期資產總計']).round(dec)
    df["負債比成長率"] = growth(df['負債比'], df['前一期負債比']).round(dec)

    return df

In [16]:
def main():
    url = "https://mopsov.twse.com.tw/server-java/t164sb01"
    save_folder = Path("/content/drive/MyDrive/Colab Notebooks/stock_auction_pred_project/csv")
    raw_data_path = save_folder / "bid_info.csv"
    fina_stmt_path = save_folder / "fin_stmts.csv"

    if fina_stmt_path.exists(): # fina_stmt存在
        fs_df = pd.read_csv(fina_stmt_path, encoding="utf-8-sig", dtype={"證券代號":str})
        fs_df["投標開始日"] = pd.to_datetime(fs_df["投標開始日"], format="mixed")
        if raw_data_path.exists():
            rd = pd.read_csv(raw_data_path, encoding="utf-8-sig", dtype={"證券代號":str})
            rd["投標開始日"] = pd.to_datetime(rd["投標開始日"], format="mixed")
        else:
            return
    elif raw_data_path.exists():
        rd = pd.read_csv(raw_data_path)
        rd["投標開始日"] = pd.to_datetime(rd["投標開始日"], format="mixed")
        fs_df = pd.DataFrame(columns=['證券代號', '投標開始日'] + new_col)
    else:
        return

    df_raw_indexed = rd.set_index(['證券代號', '投標開始日'])
    fs_df_indexed = fs_df.set_index(['證券代號', '投標開始日'])
    diff_index = df_raw_indexed.index.difference(fs_df_indexed.index)
    fs_df = fs_df_indexed.reset_index()

    def process_single_stock(code, start, url, headers, fallback_dict):
        try:
            # 1. 執行 search_year_season
            search = search_year_season(code, start)
            if search is None:
                return False, "Search result is None"
            print("股號 : ", code, "最接近季度 : ", search)
            y = search[1].split('/s')[0]
            s = search[1].split('/s')[1]
            t = search[2]

            params = {
                "step": 1,
                "CO_ID": code,
                "SYEAR": y,
                "SSEASON": s,
                "REPORT_ID": "C" if t == "合併" else "A",
            }

            # 2. 請求資料 (加入 timeout 避免死等)
            req = requests.get(url, params=params, headers=headers, timeout=15)
            req.encoding = req.apparent_encoding

            if req.status_code != 200:
                return False, "Rate Limited"

            # 3. 解析 XML/HTML
            tree = etree.HTML(req.text)
            report = get_report(tree, fallback_dict)

            if not report:
                return False, "Empty Report"

            return True, report

        except Exception as e:
            return False, str(e)

    # --- 主程式區塊 ---
    if not diff_index.empty:
        new_rows = pd.DataFrame(index=diff_index, columns=fs_df_indexed.columns)
        new_rows = new_rows.reset_index()
        new_rows["投標開始日"] = pd.to_datetime(new_rows["投標開始日"], format='mixed')
        fail = []  # 存放在第一輪失敗的 (code, start)
        headers = {"User-Agent": "Mozilla/5.0 ..."} # 建議補上 headers 減少被擋機率

        # --- 第一輪嘗試 ---
        for row in new_rows.itertuples():
            code = row.證券代號
            start = row.投標開始日
            print(f"處理股票代號 {code} 中...")
            success, result = process_single_stock(code, start, url, headers, fallback_dict)

            if success:
                report = result
                print(f"抓取成功，股號 : {code}, 投標時間 : {start}")
                print(report)
                print("-" * 30)
                # 寫入資料
                new_rows.loc[(new_rows["證券代號"] == code) & (new_rows["投標開始日"] == start), list(report.keys())] = list(report.values())
                # 計算前強制轉型 (解決 Object 問題)
                new_rows[list(report.keys())] = new_rows[list(report.keys())].apply(pd.to_numeric, errors='coerce')
                new_rows = calculate_ratios(new_rows, 3) # 計算這檔的指標
                time.sleep(random.uniform(3, 6))
            else:
                print(f"股票 : {code}, 失敗 : {result}")
                fail.append((code, start))
                time.sleep(2)

        # --- 第二輪：針對 fail 列表重新搜尋 ---
        n = 0
        fail_time = 5
        while fail and n < fail_time:
            n += 1
            print(f"失敗第 {n} 輪")
            print(f"失敗列表 : {fail}")

            for code, start in fail[:]: # 複製一份清單來跑
                print(f"重試股票 {code}...")
                success, result = process_single_stock(code, start, url, headers, fallback_dict)

                if success:
                    report = result

                    new_rows.loc[(new_rows["證券代號"] == code) & (new_rows["投標開始日"] == start), list(report.keys())] = list(report.values())
                    new_rows[list(report.keys())] = new_rows[list(report.keys())].apply(pd.to_numeric, errors='coerce')
                    new_rows = calculate_ratios(new_rows, 3)
                    fail.remove((code, start)) # 成功後從失敗清單移除
                    print(f"重試成功: {code}")
                    print(report)
                    print("-" * 30)
                    time.sleep(5)
                else:
                    print(f"重試依舊失敗: {code}, 原因: {result}")
                    time.sleep(2)

        # --- 最後彙整 ---
        # 確保最終所有資料再次補 0 與計算
        new_rows = calculate_ratios(new_rows, 3)

        fs_df = pd.concat([fs_df, new_rows])
        fs_df.to_csv(fina_stmt_path, index=False, encoding="utf-8-sig")
        print(f"存檔完成。最終失敗清單: {fail if fail else '無'}")


if __name__ == "__main__":
    main()


處理股票代號 5547 中...
update time search_:  5547 , date :  2026-01-19 00:00:00
<html><head><title>電子資料查詢作業</title><meta http-equiv="Content-Type" content="text/html;charset=big5"><link rel='stylesheet' href='/ppp.css' type='text/css'></head><body>
<style text='text/css'>
<!--
	 TH{color:#000099;font-weight:normal;font-size:10pt}
-->
</style>
<script>
function readfile(kind,coid,filename)
{
	var f = document.forms[0];
	f.step.value = 9;
	f.kind.value = kind;
	f.co_id.value = coid;
	f.filename.value = filename;
	f.submit();
}
var win = null;
function readfile2(kind,coid,filename)
{
	var f = document.forms[0];
	var today = new Date();
	var rand = today.getTime();
	if (win==null || win.closed){
		win = window.open('','download'+rand);
	}
	f.target = 'download'+rand;
	f.step.value = 9;
	f.kind.value = kind;
	f.co_id.value = coid;
	f.filename.value = filename;
	f.submit();
	f.target = this;
}
</script>
<center>
<h2><font color="blue">電子資料查詢作業</font></h2>
<br><h4 align='center'><font color='red'>查

In [17]:
# df1 = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/stock_auction_pred_project/csv/fin_stmts.csv")
# df2 = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/stock_auction_pred_project/csv/rawdata.csv")
# print(df2.columns)


# df1 = df1.drop(df1[df1['證券代號'] == 6907].index)
# df1.drop(columns=['Unnamed: 0'], inplace=True)
# df1
# df1[df1['證券代號']==6907]
# df1.to_csv("/content/drive/MyDrive/Colab Notebooks/stock_auction_pred_project/csv/fin_stmts.csv", encoding="utf-8-sig", index=False)
