In [8]:
# -*- coding: utf-8 -*-
"""TXF-Continuous-Data-Pipeline.ipynb"""

# 0. 安裝必要套件 (Colab 環境專用)
!pip install shioaji

import os
import json
import time
import pandas as pd
import numpy as np
import gspread
import shioaji as sj
from datetime import datetime, timezone, timedelta
from typing import Tuple, Dict, Any

# Google Colab 專用模組
from google.colab import userdata, auth
from google.auth import default
from google.oauth2.service_account import Credentials



In [9]:
# ==========================================
# 1. 全域配置與常數 (Configuration)
# ==========================================

# --- 連線重試設定 (Retry Config) ---
RETRY_MAX = 3           # 最大重試次數
RETRY_DELAY_BASE = 1    # 初始等待秒數 (指數退避計算用: 1s, 2s, 4s...)

# --- 功能開關 ---
FORCE_MXFR1 = True      # 強制使用近月合約代碼 (MXFR1)
TRIM_DATA = True        # 是否修剪非交易時段數據
QUERY_BACK_DAYS = 7     # 回補天數 (往前抓幾天)

# --- Google Sheets 設定 (Tab 名稱) ---
TAB_NAME_SETTLE = 'TXF_settle_date_price'
TAB_NAME_5MIN   = '5mink_new'
TAB_NAME_60MIN  = '60mink_1'

# --- 市場時段設定 ---
MARKET_HOURS = {
    "D": {"open": "08:45", "close": "13:45"},
    "N": {"open": "15:00", "close": "05:00"}
}

In [10]:
# ==========================================
# 2. 認證與連線工具 (AuthManager)
# ==========================================

class AuthManager:
    """處理 Google Sheets 與 Shioaji 的連線"""

    @staticmethod
    def get_gsheet_client():
        """建立 Google Sheets 連線"""
        try:
            service_account_json = userdata.get('GSHEET_CREDENTIALS')
            if not service_account_json:
                raise ValueError("Secret 'GSHEET_CREDENTIALS' is empty.")

            creds_dict = json.loads(service_account_json)
            # 修正 private_key 的換行符號問題
            creds_dict['private_key'] = creds_dict['private_key'].replace('\\n', '\n')

            creds = Credentials.from_service_account_info(
                creds_dict,
                scopes=['https://www.googleapis.com/auth/spreadsheets']
            )
            return gspread.authorize(creds)
        except Exception as e:
            raise ConnectionError(f"Google Sheet Auth Failed: {e}")

    @staticmethod
    def get_shioaji_api(max_retries=RETRY_MAX, base_delay=RETRY_DELAY_BASE):
        """
        建立 Shioaji API 連線並檢查配額 (Quota)
        參數:
          max_retries (int): 最大重試次數，預設從配置讀取。
          base_delay (int): 初始等待秒數，預設從配置讀取。
        """
        api = sj.Shioaji()
        api_key = userdata.get('SHIOAJI_API_KEY')
        secret_key = userdata.get('SHIOAJI_SECRET_KEY')

        if not api_key or not secret_key:
            raise ValueError("Missing SHIOAJI_API_KEY or SHIOAJI_SECRET_KEY in Secrets.")

        print("[Auth] Logging into Shioaji...")
        try:
            api.login(
                api_key=api_key,
                secret_key=secret_key,
                contracts_cb=lambda security_type: print(f"{repr(security_type)} fetch done.")
            )
        except Exception as e:
            raise ConnectionError(f"Shioaji Login Failed: {e}")

        # --- [Smart Retry] 檢查 API 用量 (指數退避版) ---
        for attempt in range(1, max_retries + 1):
            try:
                # 嘗試讀取用量
                usage_bytes = api.usage()['bytes']
                usage_mb = round(usage_bytes / (1024 * 1024), 2)
                print(f"[Auth] API Usage: {usage_mb} MB / 500 MB")
                break # 成功就跳出迴圈

            except Exception as e:
                if attempt < max_retries:
                    # 計算等待時間：1s, 2s, 4s... (依據 base_delay)
                    wait_time = base_delay * (2 ** (attempt - 1))
                    print(f"[Auth] 取得用量失敗 (第 {attempt} 次)，{wait_time} 秒後重試... 錯誤原因: {e}")
                    time.sleep(wait_time)
                else:
                    # 最後一次也失敗
                    print(f"[Auth] Warning: 無法取得 API 用量數據 (已重試 {max_retries} 次)。錯誤原因: {e}")
        # ----------------------------------------

        return api

In [11]:
# ==========================================
# 3. 核心邏輯：結算日計算 (SettleManager)
# ==========================================

class SettleManager:
    """處理結算日邏輯與合約代碼計算"""

    def __init__(self, gc):
        self.gc = gc
        # 修正：先取得 ID 字串
        self.sheet_id = userdata.get('GSHEET_ID_SETTLE')
        self.df_config = self._load_config()

    def _load_config(self) -> pd.DataFrame:
        """讀取結算設定表"""
        try:
            # 修正：先 open_by_key 再 worksheet
            worksheet = self.gc.open_by_key(self.sheet_id).worksheet(TAB_NAME_SETTLE)
            data = worksheet.get_all_values()
            df = pd.DataFrame(data[1:], columns=data[0])

            # 資料型態轉換
            cols_to_numeric = ['next_contract_diff', 'accumulated_contract_diff']
            for col in cols_to_numeric:
                df[col] = pd.to_numeric(df[col], errors='coerce')

            cols_to_datetime = ['start_k', 'settle_k']
            for col in cols_to_datetime:
                df[col] = pd.to_datetime(df[col], errors='coerce')

            return df.dropna(subset=['contract_year_month'])
        except Exception as e:
            raise RuntimeError(f"Error loading settle config: {e}")

    def calculate_next_contract(self) -> str:
        """計算下一個合約代碼 (MXF+YM)"""
        last_row = self.df_config.iloc[-1]

        # 1. 推算下個合約月份
        last_ym_dt = datetime.strptime(str(last_row['contract_year_month']), '%Y%m')
        new_ym_dt = last_ym_dt + timedelta(days=31)
        new_contract_ym = new_ym_dt.strftime('%Y%m')

        # 2. 推算下個結算日 (該月第三個週三)
        first_day = datetime(new_ym_dt.year, new_ym_dt.month, 1)
        third_wed = first_day + pd.DateOffset(weeks=2)
        while third_wed.weekday() != 2:
            third_wed += timedelta(days=1)

        new_settle_k = third_wed + timedelta(hours=13, minutes=25)
        new_start_k = last_row['settle_k'] + timedelta(minutes=5)

        # 計算累積價差 (僅用於記憶體運算，預測用)
        new_acc_diff = last_row['accumulated_contract_diff'] + last_row['next_contract_diff']

        print(f"[Info] Current Config End: {last_row['contract_year_month']}")
        print(f"[Info] Predicted Next Contract: {new_contract_ym}, Settle: {new_settle_k}")

        # 將預測的新行加回記憶體中的 DataFrame，讓 K 棒比對能涵蓋到最新時間
        new_row = pd.DataFrame([{
            'contract_year_month': new_contract_ym,
            'accumulated_contract_diff': new_acc_diff,
            'start_k': new_start_k,
            'settle_k': new_settle_k
        }])
        self.df_config = pd.concat([self.df_config, new_row], ignore_index=True)

        # 回傳當前應抓取的合約代碼
        return 'MXF' + new_contract_ym

In [12]:
# ==========================================
# 4. 核心邏輯：資料處理 (DataProcessor)
# ==========================================

class DataProcessor:
    """處理 K 棒資料的清洗、重取樣、價差調整與完整性檢查"""

    @staticmethod
    def fetch_and_parse_kbars(api, contract_code: str, days_back: int) -> pd.DataFrame:
        """從 Shioaji 抓取資料"""
        now = datetime.now(timezone(timedelta(hours=+8)))
        end_date = now.strftime("%Y-%m-%d")
        start_date = (now - timedelta(days=days_back)).strftime('%Y-%m-%d')

        target_code = "MXFR1" if FORCE_MXFR1 else contract_code
        if not FORCE_MXFR1 and not api.Contracts.Futures.MXF[target_code]:
             target_code = "MXFR1"

        print(f"[Action] Fetching {target_code} from {start_date} to {end_date}...")

        contract = api.Contracts.Futures.MXF[target_code]
        kbars = api.kbars(contract, start=start_date, end=end_date)

        df = pd.DataFrame({**kbars}).drop(columns=["Amount"])
        if df.empty:
            return df, target_code

        df['ts'] = pd.to_datetime(df['ts'])
        df = df.set_index("ts").sort_index()

        return df, target_code

    @staticmethod
    def resample_and_split(df_raw: pd.DataFrame, df_settle_config: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """Resample, Split D/N, Back Adjust, Add Metadata"""

        # 1. 轉 5分K
        df_5m = df_raw.resample('5min', label="left", closed='right').agg({
            'Open': 'first', 'High': 'max', 'Low': 'min', 'Close': 'last', 'Volume': 'sum'
        }).dropna()

        # 2. 分切日盤/夜盤
        df_5m_D = df_5m.between_time(MARKET_HOURS["D"]["open"], MARKET_HOURS["D"]["close"]).copy()
        df_5m_N = df_5m.between_time(MARKET_HOURS["N"]["open"], MARKET_HOURS["N"]["close"]).copy()

        # 3. 轉 60分K
        df_60m_D = df_5m_D.resample("60min", offset="45min").agg({
            'Open': 'first', 'High': 'max', 'Low': 'min', 'Close': 'last', 'Volume': 'sum'
        }).dropna()

        df_60m_N = df_5m_N.resample("60min").agg({
            'Open': 'first', 'High': 'max', 'Low': 'min', 'Close': 'last', 'Volume': 'sum'
        }).dropna()

        # --- [修正] 補回 date_market_type (依據盤別開始日) ---
        def get_market_date_str(ts, is_night=False):
            target_ts = ts
            # 只有在凌晨 (00:00-05:00) 且是夜盤時，才需要減一天，歸屬到前一天的夜盤
            if is_night and ts.hour < 5:
                target_ts = ts - timedelta(days=1)

            # 下午 15:00 之後的夜盤，日期維持當天 (例如 1/16 15:00 -> 260116N)

            suffix = "N" if is_night else "D"
            return target_ts.strftime("%y%m%d") + suffix

        # 為 5分K / 60分K 加上標籤
        for df_temp, is_night in [(df_5m_D, False), (df_5m_N, True), (df_60m_D, False), (df_60m_N, True)]:
             if not df_temp.empty: # 加上檢查避免空 DataFrame 報錯
                 df_temp['date_market_type'] = df_temp.index.to_series().apply(lambda x: get_market_date_str(x, is_night))

        # 4. 價差調整與欄位補全
        def process_final_df(df_d, df_n):
            df_all = pd.concat([df_d, df_n]).sort_index()

            if df_all.empty:
                return df_all

            # 準備欄位
            df_all['contract_year_month'] = ""
            df_all['accumulated_contract_diff'] = 0

            # 使用 apply 逐行計算
            def enrich_row(row):
                match = df_settle_config[
                    (row.name >= df_settle_config['start_k']) &
                    (row.name <= df_settle_config['settle_k'])
                ]

                res = row.copy()
                if not match.empty:
                    cfg = match.iloc[0]
                    diff = int(cfg['accumulated_contract_diff'])
                    res['accumulated_contract_diff'] = diff
                    res['contract_year_month'] = cfg['contract_year_month']

                    res['Open'] += diff
                    res['High'] += diff
                    res['Low'] += diff
                    res['Close'] += diff
                return res

            df_enriched = df_all.apply(enrich_row, axis=1)
            return df_enriched

        df_5m_final = process_final_df(df_5m_D, df_5m_N)
        df_60m_final = process_final_df(df_60m_D, df_60m_N)

        return df_5m_final, df_60m_final

    @staticmethod
    def check_completeness(df: pd.DataFrame, timeframe: str):
        """[資料完整性檢查]"""
        EXPECTED = {
            '5min':  {'D': 60, 'N': 168},
            '60min': {'D': 5,  'N': 14}
        }

        if timeframe not in EXPECTED or df.empty:
            return

        print(f"[Check] Verifying data completeness for {timeframe}...")
        ts_series = df.index.to_series()

        def get_group_id(ts):
            if 8 <= ts.hour <= 13: return ts.strftime('%Y-%m-%d') + '_D'
            elif ts.hour >= 15: return ts.strftime('%Y-%m-%d') + '_N'
            elif ts.hour < 5:
                # 這裡邏輯跟上面一致：凌晨歸屬到前一天
                prev_date = ts - timedelta(days=1)
                return prev_date.strftime('%Y-%m-%d') + '_N'
            return 'UNKNOWN'

        groups = ts_series.apply(get_group_id)
        counts = groups.value_counts()

        errors = []
        for group_id, count in counts.items():
            if 'UNKNOWN' in group_id: continue
            market_type = group_id.split('_')[-1]
            expected_count = EXPECTED[timeframe][market_type]
            if count != expected_count:
                error_msg = f"  - {group_id}: 預期 {expected_count} 筆, 實際 {count} 筆"
                errors.append(error_msg)

        if errors:
            raise ValueError(f"資料完整性檢查失敗 ({timeframe})，停止上傳！\n" + "\n".join(errors))

        print(f"[Check] {timeframe} Pass. All sessions appear complete.")

In [13]:
# ==========================================
# 5. 上傳工具 (SheetUploader)
# ==========================================

class SheetUploader:
    """處理 Google Sheets 的寫入與上傳邏輯"""

    @staticmethod
    def _prepare_data(df_new: pd.DataFrame, existing_data: list) -> Tuple[pd.DataFrame, bool]:
        """
        [純邏輯] 資料清洗與比對
        回傳: (df_to_upload, needs_header_flag)
        """
        # 1. 先統一將 Index (ts) 轉為實體欄位，並轉字串格式
        df_process = df_new.copy()
        df_process.index.name = 'ts' # 確保 index 名稱正確
        df_process.reset_index(inplace=True) # 將 ts 轉為一般欄位
        df_process['ts'] = df_process['ts'].dt.strftime('%Y-%m-%d %H:%M:%S')

        # --- 情境 A: Sheet 完全是空的 (第一次執行) ---
        if not existing_data:
            # 調整欄位順序：確保 ts 在第一欄，其他的排後面
            cols = ['ts'] + [c for c in df_process.columns if c != 'ts']
            return df_process[cols], True # True 代表需要寫入表頭

        # --- 情境 B: 只有表頭 (剛開好欄位但沒資料) ---
        headers = existing_data[0]
        if len(existing_data) == 1:
            # 依照現有表頭排序
            valid_cols = [c for c in headers if c in df_process.columns]
            return df_process[valid_cols], False

        # --- 情境 C: 已有資料 (正常更新) ---
        try:
            # 找出最後一筆時間
            if 'ts' in headers:
                ts_col_idx = headers.index('ts')
            else:
                # 容錯：如果第一欄看起來像時間，就當作是 ts
                ts_col_idx = 0

            last_ts_str = existing_data[-1][ts_col_idx]
            last_ts = pd.to_datetime(last_ts_str)

            # 篩選新資料 (比較 ts 欄位)
            # 因為 df_process['ts'] 已經是字串，我們需要暫時轉回 datetime 比較，或用原始 df_new 的 index 比較
            # 這裡採用簡單法：利用原始 df_new 的 index 來過濾，再 merge 回去，或者直接對字串做比較(風險較高)
            # 最穩做法：重新建立過濾遮罩

            # 使用我們剛轉好的字串 ts 轉回 datetime 進行比較 (確保格式一致)
            current_ts_series = pd.to_datetime(df_process['ts'])
            df_to_upload = df_process[current_ts_series > last_ts].copy()

            if df_to_upload.empty:
                return pd.DataFrame(), False

            # 依照 Sheet 現有欄位排序
            valid_cols = [c for c in headers if c in df_to_upload.columns]
            return df_to_upload[valid_cols], False

        except Exception as e:
            print(f"[Error] Data preparation failed: {e}")
            return pd.DataFrame(), False

    @staticmethod
    def append_safely(gc, tab_name: str, df_new: pd.DataFrame):
        """[I/O 操作] 連線並執行上傳 (支援空表自動建表頭)"""
        try:
            sheet_id = userdata.get('GSHEET_ID_DATA')
            worksheet = gc.open_by_key(sheet_id).worksheet(tab_name)
            existing_data = worksheet.get_all_values()
        except gspread.WorksheetNotFound:
            print(f"[Error] Worksheet '{tab_name}' not found. Skipping.")
            return
        except Exception as e:
            print(f"[Error] Google Sheet Connection failed: {e}")
            return

        # 呼叫邏輯層
        df_export, needs_header = SheetUploader._prepare_data(df_new, existing_data)

        if not df_export.empty:
            print(f"[{tab_name}] Uploading {len(df_export)} rows...")

            data_to_write = df_export.values.tolist()

            # 如果是空表，先把表頭插在第一行
            if needs_header:
                print(f"[{tab_name}] Detect empty sheet. Writing headers first.")
                headers = df_export.columns.tolist()
                data_to_write = [headers] + data_to_write

            try:
                worksheet.append_rows(data_to_write)
                print(f"[{tab_name}] Upload Success.")
            except Exception as e:
                print(f"[{tab_name}] Upload Failed: {e}")
        else:
            print(f"[{tab_name}] No new data to upload.")

In [14]:
# ==========================================
# 6. 主程式 (Main Execution)
# ==========================================

if __name__ == "__main__":
    start_time = time.time()

    try:
        print("=== Automation Started ===")

        # 1. 初始化連線與結算管理器
        gc = AuthManager.get_gsheet_client()
        settle_mgr = SettleManager(gc)

        # 2. 計算目標合約
        target_contract = settle_mgr.calculate_next_contract()

        # 3. 登入 API 並抓取資料
        api = AuthManager.get_shioaji_api()
        df_raw, used_code = DataProcessor.fetch_and_parse_kbars(api, target_contract, QUERY_BACK_DAYS)
        api.logout()

        if not df_raw.empty:
            # 4. 數據清洗、重取樣與價差調整
            df_5m, df_60m = DataProcessor.resample_and_split(df_raw, settle_mgr.df_config)

            # ----------------------------------------------------
            #  執行資料完整性檢查 (Check Point)
            # ----------------------------------------------------
            # 若資料缺漏，將拋出 ValueError 並停止流程
            DataProcessor.check_completeness(df_5m, '5min')
            DataProcessor.check_completeness(df_60m, '60min')
            # ----------------------------------------------------

            # 標記使用的代碼 (Optional)
            df_5m['MXF_code'] = used_code
            df_60m['MXF_code'] = used_code

            # 5. 上傳至資料庫 (使用新的 Class)
            SheetUploader.append_safely(gc, TAB_NAME_5MIN, df_5m)
            SheetUploader.append_safely(gc, TAB_NAME_60MIN, df_60m)

        else:
            print("[Warning] No data fetched from API.")

        print(f"=== Automation Finished in {round(time.time() - start_time, 2)}s ===")

    except ValueError as ve:
        # 針對資料檢查錯誤的優雅處理
        print(f"\n[DATA INTEGRITY ERROR] -------------------------")
        print(ve)
        print(f"------------------------------------------------")
        print("上傳已終止，請檢查源頭資料狀況。")

    except Exception as e:
        # 其他未預期的錯誤 (包含連線失敗、權限錯誤等)
        print(f"[FATAL ERROR] Script crashed: {e}")

=== Automation Started ===
[Info] Current Config End: 202512
[Info] Predicted Next Contract: 202601, Settle: 2026-01-21 13:25:00
[Auth] Logging into Shioaji...
Response Code: 0 | Event Code: 0 | Info: host '210.59.255.161:80', hostname '210.59.255.161:80' IP 210.59.255.161:80 (host 1 of 1) (host connection attempt 1 of 1) (total connection attempt 1 of 1) | Event: Session up
<SecurityType.Index: 'IND'> fetch done.
<SecurityType.Stock: 'STK'> fetch done.
<SecurityType.Future: 'FUT'> fetch done.
<SecurityType.Option: 'OPT'> fetch done.
Response Code: 200 | Event Code: 16 | Info: APISUB/V1/SYS/CONTRACT | Event: Subscribe or Unsubscribe ok
[Auth] API Usage: 62.32 MB / 500 MB
[Action] Fetching MXFR1 from 2026-01-11 to 2026-01-18...
[Check] Verifying data completeness for 5min...
[Check] 5min Pass. All sessions appear complete.
[Check] Verifying data completeness for 60min...
[Check] 60min Pass. All sessions appear complete.
[5mink_new] No new data to upload.
[60mink_1] No new data to upload